This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c100d815d7 [ISSUE #7328] Convergent thread pool creation (#7329)
c100d815d7 is described below

commit c100d815d754d7cb330bc63e145bafd2d9b59cb1
Author: guyinyou <[email protected]>
AuthorDate: Mon Sep 11 10:13:56 2023 +0800

    [ISSUE #7328] Convergent thread pool creation (#7329)
    
    * Convergence thread pool creation to facilitate subsequent iteration 
management
    
    * Convergence thread pool creation in ThreadPoolMonitor.java
    
    * fix unit test
    
    * Convergence ThreadPool constructor
    
    * Convergence ScheduledThreadPool constructor
    
    * remove unused import
    
    * Convergence ScheduledThreadPool constructor
    
    * remove unused import
    
    ---------
---
 .../apache/rocketmq/broker/BrokerController.java   | 39 ++++++------
 .../broker/client/ClientHousekeepingService.java   |  4 +-
 .../client/DefaultConsumerIdsChangeListener.java   |  3 +-
 .../broker/controller/ReplicasManager.java         |  9 ++-
 .../broker/dledger/DLedgerRoleChangeHandler.java   |  4 +-
 .../rocketmq/broker/failover/EscapeBridge.java     |  4 +-
 .../rocketmq/broker/latency/BrokerFastFailure.java |  5 +-
 .../latency/BrokerFixedThreadPoolExecutor.java     | 57 -----------------
 .../rocketmq/broker/latency/FutureTaskExt.java     | 39 ------------
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  7 +-
 .../broker/schedule/ScheduleMessageService.java    |  7 +-
 .../broker/topic/TopicRouteInfoManager.java        |  4 +-
 .../AbstractTransactionalMessageCheckListener.java |  4 +-
 .../rocketmq/broker/BrokerControllerTest.java      |  2 +-
 .../broker/latency/BrokerFastFailureTest.java      |  1 +
 .../common/config/AbstractRocksDBStorage.java      |  6 +-
 .../thread/FutureTaskExtThreadPoolExecutor.java    |  3 +-
 .../rocketmq/common/thread/ThreadPoolMonitor.java  |  6 +-
 .../apache/rocketmq/common/utils/ThreadUtils.java  | 74 +++++++++++++++++++---
 .../apache/rocketmq/container/BrokerContainer.java |  6 +-
 .../rocketmq/controller/ControllerManager.java     | 14 ++--
 .../controller/impl/DLedgerController.java         | 10 +--
 .../heartbeat/DefaultBrokerHeartbeatManager.java   |  3 +-
 .../apache/rocketmq/namesrv/NamesrvController.java | 22 ++-----
 .../proxy/grpc/v2/channel/GrpcChannelManager.java  |  6 +-
 .../proxy/remoting/RemotingProtocolServer.java     |  4 +-
 .../proxy/service/ClusterServiceManager.java       | 12 ++--
 .../proxy/service/LocalServiceManager.java         |  4 +-
 .../receipt/DefaultReceiptHandleManager.java       |  8 +--
 .../proxy/service/route/TopicRouteService.java     |  9 ++-
 .../remoting/netty/NettyRemotingClient.java        |  4 +-
 .../remoting/netty/NettyRemotingServer.java        |  4 +-
 .../apache/rocketmq/store/DefaultMessageStore.java |  8 +--
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 38 ++++++-----
 .../apache/rocketmq/store/kv/CompactionStore.java  | 21 +++---
 .../rocketmq/store/queue/ConsumeQueueStore.java    |  4 +-
 .../rocketmq/store/stats/BrokerStatsManager.java   | 14 ++--
 .../rocketmq/store/timer/TimerMessageStore.java    |  6 +-
 .../org/apache/rocketmq/test/util/StatUtil.java    |  1 -
 .../tieredstore/common/TieredStoreExecutor.java    | 14 ++--
 .../tools/admin/DefaultMQAdminExtImpl.java         |  3 +-
 41 files changed, 215 insertions(+), 278 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6aba70cb21..275b64b1ab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -34,7 +34,6 @@ import org.apache.rocketmq.broker.failover.EscapeBridge;
 import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.broker.latency.BrokerFastFailure;
-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
 import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
 import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -98,6 +97,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.stats.MomentStatsItem;
 import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.Configuration;
@@ -160,7 +160,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -455,10 +454,10 @@ public class BrokerController {
      * Initialize resources including remoting server and thread executors.
      */
     protected void initializeResources() {
-        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+        this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("BrokerControllerScheduledThread", true, 
getBrokerIdentity()));
 
-        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
+        this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getSendMessageThreadPoolNums(),
             this.brokerConfig.getSendMessageThreadPoolNums(),
             1000 * 60,
@@ -466,7 +465,7 @@ public class BrokerController {
             this.sendThreadPoolQueue,
             new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
 
-        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
+        this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getPullMessageThreadPoolNums(),
             this.brokerConfig.getPullMessageThreadPoolNums(),
             1000 * 60,
@@ -474,7 +473,7 @@ public class BrokerController {
             this.pullThreadPoolQueue,
             new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));
 
-        this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor(
+        this.litePullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getLitePullMessageThreadPoolNums(),
             this.brokerConfig.getLitePullMessageThreadPoolNums(),
             1000 * 60,
@@ -482,7 +481,7 @@ public class BrokerController {
             this.litePullThreadPoolQueue,
             new ThreadFactoryImpl("LitePullMessageThread_", 
getBrokerIdentity()));
 
-        this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
+        this.putMessageFutureExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getPutMessageFutureThreadPoolNums(),
             this.brokerConfig.getPutMessageFutureThreadPoolNums(),
             1000 * 60,
@@ -490,7 +489,7 @@ public class BrokerController {
             this.putThreadPoolQueue,
             new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
 
-        this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
+        this.ackMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getAckMessageThreadPoolNums(),
             this.brokerConfig.getAckMessageThreadPoolNums(),
             1000 * 60,
@@ -498,7 +497,7 @@ public class BrokerController {
             this.ackThreadPoolQueue,
             new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));
 
-        this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
+        this.queryMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getQueryMessageThreadPoolNums(),
             this.brokerConfig.getQueryMessageThreadPoolNums(),
             1000 * 60,
@@ -506,7 +505,7 @@ public class BrokerController {
             this.queryThreadPoolQueue,
             new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));
 
-        this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor(
+        this.adminBrokerExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getAdminBrokerThreadPoolNums(),
             this.brokerConfig.getAdminBrokerThreadPoolNums(),
             1000 * 60,
@@ -514,7 +513,7 @@ public class BrokerController {
             this.adminBrokerThreadPoolQueue,
             new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));
 
-        this.clientManageExecutor = new BrokerFixedThreadPoolExecutor(
+        this.clientManageExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getClientManageThreadPoolNums(),
             this.brokerConfig.getClientManageThreadPoolNums(),
             1000 * 60,
@@ -522,7 +521,7 @@ public class BrokerController {
             this.clientManagerThreadPoolQueue,
             new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));
 
-        this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
+        this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getHeartbeatThreadPoolNums(),
             this.brokerConfig.getHeartbeatThreadPoolNums(),
             1000 * 60,
@@ -530,7 +529,7 @@ public class BrokerController {
             this.heartbeatThreadPoolQueue,
             new ThreadFactoryImpl("HeartbeatThread_", true, 
getBrokerIdentity()));
 
-        this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor(
+        this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getConsumerManageThreadPoolNums(),
             this.brokerConfig.getConsumerManageThreadPoolNums(),
             1000 * 60,
@@ -538,7 +537,7 @@ public class BrokerController {
             this.consumerManagerThreadPoolQueue,
             new ThreadFactoryImpl("ConsumerManageThread_", true, 
getBrokerIdentity()));
 
-        this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
+        this.replyMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
             this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
             1000 * 60,
@@ -546,7 +545,7 @@ public class BrokerController {
             this.replyThreadPoolQueue,
             new ThreadFactoryImpl("ProcessReplyMessageThread_", 
getBrokerIdentity()));
 
-        this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
+        this.endTransactionExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getEndTransactionThreadPoolNums(),
             this.brokerConfig.getEndTransactionThreadPoolNums(),
             1000 * 60,
@@ -554,7 +553,7 @@ public class BrokerController {
             this.endTransactionThreadPoolQueue,
             new ThreadFactoryImpl("EndTransactionThread_", 
getBrokerIdentity()));
 
-        this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor(
+        this.loadBalanceExecutor = ThreadUtils.newThreadPoolExecutor(
             this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
             this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
             1000 * 60,
@@ -562,9 +561,9 @@ public class BrokerController {
             this.loadBalanceThreadPoolQueue,
             new ThreadFactoryImpl("LoadBalanceProcessorThread_", 
getBrokerIdentity()));
 
-        this.syncBrokerMemberGroupExecutorService = new 
ScheduledThreadPoolExecutor(1,
+        this.syncBrokerMemberGroupExecutorService = 
ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", 
getBrokerIdentity()));
-        this.brokerHeartbeatExecutorService = new 
ScheduledThreadPoolExecutor(1,
+        this.brokerHeartbeatExecutorService = 
ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("BrokerControllerHeartbeatScheduledThread", 
getBrokerIdentity()));
 
         this.topicQueueMappingCleanService = new 
TopicQueueMappingCleanService(this);
@@ -828,8 +827,6 @@ public class BrokerController {
 
             initializeResources();
 
-            registerProcessor();
-
             initializeScheduledTasks();
 
             initialTransaction();
@@ -1690,6 +1687,8 @@ public class BrokerController {
                 }
             }
         }, 10, 5, TimeUnit.SECONDS);
+
+        registerProcessor();
     }
 
     protected void scheduleSendHeartbeat() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index 98e5f450f3..cbb81f632b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -18,11 +18,11 @@ package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -35,7 +35,7 @@ public class ClientHousekeepingService implements 
ChannelEventListener {
 
     public ClientHousekeepingService(final BrokerController brokerController) {
         this.brokerController = brokerController;
-        scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+        scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("ClientHousekeepingScheduledThread", 
brokerController.getBrokerIdentity()));
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index 2ce036a0ff..d17a2a5470 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -37,7 +36,7 @@ public class DefaultConsumerIdsChangeListener implements 
ConsumerIdsChangeListen
     private final BrokerController brokerController;
     private final int cacheSize = 8096;
 
-    private final ScheduledExecutorService scheduledExecutorService =  new 
ScheduledThreadPoolExecutor(1,
+    private final ScheduledExecutorService scheduledExecutorService =  
ThreadUtils.newScheduledThreadPool(1,
         
ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true));
 
     private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new 
ConcurrentHashMap<>(cacheSize);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 37c82e434b..a989e6e68f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -27,10 +27,8 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
@@ -42,6 +40,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.EpochEntry;
@@ -107,9 +106,9 @@ public class ReplicasManager {
     public ReplicasManager(final BrokerController brokerController) {
         this.brokerController = brokerController;
         this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
-        this.scheduledService = Executors.newScheduledThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ScheduledService_", 
brokerController.getBrokerIdentity()));
-        this.executorService = Executors.newFixedThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ExecutorService_", 
brokerController.getBrokerIdentity()));
-        this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+        this.scheduledService = ThreadUtils.newScheduledThreadPool(3, new 
ThreadFactoryImpl("ReplicasManager_ScheduledService_", 
brokerController.getBrokerIdentity()));
+        this.executorService = ThreadUtils.newThreadPoolExecutor(3, new 
ThreadFactoryImpl("ReplicasManager_ExecutorService_", 
brokerController.getBrokerIdentity()));
+        this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, 
TimeUnit.SECONDS,
             new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("ReplicasManager_scan_thread_", 
brokerController.getBrokerIdentity()));
         this.haService = (AutoSwitchHAService) 
brokerController.getMessageStore().getHaService();
         this.brokerConfig = brokerController.getBrokerConfig();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
index 75023ee1b8..e6cb97640b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
@@ -21,12 +21,12 @@ import io.openmessaging.storage.dledger.DLedgerServer;
 import io.openmessaging.storage.dledger.MemberState;
 import io.openmessaging.storage.dledger.utils.DLedgerUtils;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -49,7 +49,7 @@ public class DLedgerRoleChangeHandler implements 
DLedgerLeaderElector.RoleChange
         this.messageStore = messageStore;
         this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
         this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
-        this.executorService = Executors.newSingleThreadExecutor(
+        this.executorService = ThreadUtils.newSingleThreadExecutor(
             new ThreadFactoryImpl("DLegerRoleChangeHandler_", 
brokerController.getBrokerIdentity()));
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java 
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 7c350fc1d7..6a08174801 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
@@ -43,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -72,7 +72,7 @@ public class EscapeBridge {
     public void start() throws Exception {
         if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() && 
brokerController.getBrokerConfig().isEnableRemoteEscape()) {
             final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new 
LinkedBlockingQueue<>(50000);
-            this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
+            this.defaultAsyncSenderExecutor = 
ThreadUtils.newThreadPoolExecutor(
                 Runtime.getRuntime().availableProcessors(),
                 Runtime.getRuntime().availableProcessors(),
                 1000 * 60,
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index d3d0bc8ba3..3b6e9dc676 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -18,13 +18,14 @@ package org.apache.rocketmq.broker.latency;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.netty.RequestTask;
@@ -43,7 +44,7 @@ public class BrokerFastFailure {
 
     public BrokerFastFailure(final BrokerController brokerController) {
         this.brokerController = brokerController;
-        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+        this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
                 brokerController == null ? null : 
brokerController.getBrokerConfig()));
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
deleted file mode 100644
index d2d1143a34..0000000000
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.latency;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int 
maximumPoolSize, final long keepAliveTime,
-        final TimeUnit unit,
-        final BlockingQueue<Runnable> workQueue) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
-    }
-
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int 
maximumPoolSize, final long keepAliveTime,
-        final TimeUnit unit,
-        final BlockingQueue<Runnable> workQueue, final ThreadFactory 
threadFactory) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
-    }
-
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int 
maximumPoolSize, final long keepAliveTime,
-        final TimeUnit unit,
-        final BlockingQueue<Runnable> workQueue, final 
RejectedExecutionHandler handler) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
handler);
-    }
-
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int 
maximumPoolSize, final long keepAliveTime,
-        final TimeUnit unit,
-        final BlockingQueue<Runnable> workQueue, final ThreadFactory 
threadFactory,
-        final RejectedExecutionHandler handler) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, handler);
-    }
-
-    @Override
-    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final 
T value) {
-        return new FutureTaskExt<>(runnable, value);
-    }
-}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
deleted file mode 100644
index f132efaebc..0000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.latency;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-public class FutureTaskExt<V> extends FutureTask<V> {
-    private final Runnable runnable;
-
-    public FutureTaskExt(final Callable<V> callable) {
-        super(callable);
-        this.runnable = null;
-    }
-
-    public FutureTaskExt(final Runnable runnable, final V result) {
-        super(runnable, result);
-        this.runnable = runnable;
-    }
-
-    public Runnable getRunnable() {
-        return runnable;
-    }
-}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index ae81e8b11d..9dfb8127d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -27,9 +27,9 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -59,6 +59,7 @@ import 
org.apache.rocketmq.common.namesrv.DefaultTopAddressing;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
@@ -144,7 +145,7 @@ public class BrokerOuterAPI {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final RemotingClient remotingClient;
     private final TopAddressing topAddressing = new 
DefaultTopAddressing(MixAll.getWSAddr());
-    private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new 
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
+    private final ExecutorService brokerOuterExecutor = 
ThreadUtils.newThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
             new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("brokerOutApi_thread_", true));
     private final ClientMetadata clientMetadata;
     private final RpcClient rpcClient;
@@ -1092,7 +1093,7 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public BrokerFixedThreadPoolExecutor getBrokerOuterExecutor() {
+    public ExecutorService getBrokerOuterExecutor() {
         return brokerOuterExecutor;
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 297b14207c..0c2e6507bd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager {
     public ScheduleMessageService(final BrokerController brokerController) {
         this.brokerController = brokerController;
         this.enableAsyncDeliver = 
brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
-        scheduledPersistService = new ScheduledThreadPoolExecutor(1,
+        scheduledPersistService = ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, 
brokerController.getBrokerConfig()));
     }
 
@@ -134,9 +133,9 @@ public class ScheduleMessageService extends ConfigManager {
     public void start() {
         if (started.compareAndSet(false, true)) {
             this.load();
-            this.deliverExecutorService = new 
ScheduledThreadPoolExecutor(this.maxDelayLevel, new 
ThreadFactoryImpl("ScheduleMessageTimerThread_"));
+            this.deliverExecutorService = 
ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new 
ThreadFactoryImpl("ScheduleMessageTimerThread_"));
             if (this.enableAsyncDeliver) {
-                this.handleExecutorService = new 
ScheduledThreadPoolExecutor(this.maxDelayLevel, new 
ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
+                this.handleExecutorService = 
ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new 
ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
             }
             for (Map.Entry<Integer, Long> entry : 
this.delayLevelTable.entrySet()) {
                 Integer level = entry.getKey();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
index b355647255..11bde5f5fe 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -23,7 +23,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -66,7 +66,7 @@ public class TopicRouteInfoManager {
     }
 
     public void start() {
-        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
+        this.scheduledExecutorService = 
ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
 
         this.scheduledExecutorService.scheduleAtFixedRate(() -> {
             try {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 771d843006..982355d783 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.transaction;
 import io.netty.channel.Channel;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
@@ -27,6 +26,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import 
org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
@@ -97,7 +97,7 @@ public abstract class 
AbstractTransactionalMessageCheckListener {
 
     public synchronized void initExecutorService() {
         if (executorService == null) {
-            executorService = new ThreadPoolExecutor(2, 5, 100, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
+            executorService = ThreadUtils.newThreadPoolExecutor(2, 5, 100, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
                 new ThreadFactoryImpl("Transaction-msg-check-thread", 
brokerController.getBrokerIdentity()), new CallerRunsPolicy());
         }
     }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 75ad961ce9..6035a20acb 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -23,9 +23,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.rocketmq.broker.latency.FutureTaskExt;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.netty.RequestTask;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
index 5d0f7f9d72..31b547cf1b 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.latency;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.junit.Test;
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index a720a5be32..6f19a9815d 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import com.google.common.collect.Maps;
 
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -82,8 +82,8 @@ public abstract class AbstractRocksDBStorage {
     private volatile boolean closed;
 
     private final Semaphore reloadPermit = new Semaphore(1);
-    private final ScheduledExecutorService reloadScheduler = new 
ScheduledThreadPoolExecutor(1, new 
ThreadFactoryImpl("RocksDBStorageReloadService_"));
-    private final ThreadPoolExecutor manualCompactionThread = new 
ThreadPoolExecutor(
+    private final ScheduledExecutorService reloadScheduler = 
ThreadUtils.newScheduledThreadPool(1, new 
ThreadFactoryImpl("RocksDBStorageReloadService_"));
+    private final ThreadPoolExecutor manualCompactionThread = 
(ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
             1, 1, 1000 * 60, TimeUnit.MILLISECONDS,
             new ArrayBlockingQueue(1),
             new ThreadFactoryImpl("RocksDBManualCompactionService_"),
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
 
b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
index 411da92219..7b68873a99 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
@@ -29,7 +29,8 @@ public class FutureTaskExtThreadPoolExecutor extends 
ThreadPoolExecutor {
 
     public FutureTaskExtThreadPoolExecutor(int corePoolSize, int 
maximumPoolSize, long keepAliveTime,
         TimeUnit unit,
-        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+        BlockingQueue<Runnable> workQueue,
+        ThreadFactory threadFactory,
         RejectedExecutionHandler handler) {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, handler);
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java 
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
index 49d97a5d72..1bfabbffed 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
@@ -22,12 +22,12 @@ import 
com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
@@ -36,7 +36,7 @@ public class ThreadPoolMonitor {
     private static Logger waterMarkLogger = 
LoggerFactory.getLogger(ThreadPoolMonitor.class);
 
     private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new 
CopyOnWriteArrayList<>();
-    private static final ScheduledExecutorService MONITOR_SCHEDULED = 
Executors.newSingleThreadScheduledExecutor(
+    private static final ScheduledExecutorService MONITOR_SCHEDULED = 
ThreadUtils.newSingleThreadScheduledExecutor(
         new 
ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
     );
 
@@ -81,7 +81,7 @@ public class ThreadPoolMonitor {
         String name,
         int queueCapacity,
         List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
-        ThreadPoolExecutor executor = new FutureTaskExtThreadPoolExecutor(
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) 
ThreadUtils.newThreadPoolExecutor(
             corePoolSize,
             maximumPoolSize,
             keepAliveTime,
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java 
b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
index 4b366d4e39..1644c6360e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
@@ -20,38 +20,94 @@ package org.apache.rocketmq.common.utils;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.thread.FutureTaskExtThreadPoolExecutor;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 public final class ThreadUtils {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
 
-    public static ExecutorService newThreadPoolExecutor(int corePoolSize, int 
maximumPoolSize, long keepAliveTime,
-        TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName, 
boolean isDaemon) {
-        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+    public static ExecutorService newSingleThreadExecutor(String processName, 
boolean isDaemon) {
+        return 
ThreadUtils.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
     }
 
-    public static ExecutorService newSingleThreadExecutor(String processName, 
boolean isDaemon) {
-        return Executors.newSingleThreadExecutor(newThreadFactory(processName, 
isDaemon));
+    public static ExecutorService newSingleThreadExecutor(ThreadFactory 
threadFactory) {
+        return ThreadUtils.newThreadPoolExecutor(1, threadFactory);
+    }
+
+    public static ExecutorService newThreadPoolExecutor(int corePoolSize, 
ThreadFactory threadFactory) {
+        return ThreadUtils.newThreadPoolExecutor(corePoolSize, corePoolSize,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            threadFactory);
+    }
+
+    public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+        int maximumPoolSize,
+        long keepAliveTime,
+        TimeUnit unit, BlockingQueue<Runnable> workQueue,
+        String processName,
+        boolean isDaemon) {
+        return ThreadUtils.newThreadPoolExecutor(corePoolSize, 
maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, 
isDaemon));
+    }
+
+    public static ExecutorService newThreadPoolExecutor(final int corePoolSize,
+        final int maximumPoolSize,
+        final long keepAliveTime,
+        final TimeUnit unit,
+        final BlockingQueue<Runnable> workQueue,
+        final ThreadFactory threadFactory) {
+        return ThreadUtils.newThreadPoolExecutor(corePoolSize, 
maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new 
ThreadPoolExecutor.AbortPolicy());
+    }
+
+    public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+        int maximumPoolSize,
+        long keepAliveTime,
+        TimeUnit unit,
+        BlockingQueue<Runnable> workQueue,
+        ThreadFactory threadFactory,
+        RejectedExecutionHandler handler) {
+        return new FutureTaskExtThreadPoolExecutor(corePoolSize, 
maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
     }
 
     public static ScheduledExecutorService 
newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
-        return 
Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, 
isDaemon));
+        return ThreadUtils.newScheduledThreadPool(1, processName, isDaemon);
+    }
+
+    public static ScheduledExecutorService 
newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
+        return ThreadUtils.newScheduledThreadPool(1, threadFactory);
+    }
+
+    public static ScheduledExecutorService newScheduledThreadPool(int 
corePoolSize) {
+        return ThreadUtils.newScheduledThreadPool(corePoolSize, 
Executors.defaultThreadFactory());
     }
 
-    public static ScheduledExecutorService newFixedThreadScheduledPool(int 
nThreads, String processName,
+    public static ScheduledExecutorService newScheduledThreadPool(int 
corePoolSize, String processName,
         boolean isDaemon) {
-        return Executors.newScheduledThreadPool(nThreads, 
newThreadFactory(processName, isDaemon));
+        return ThreadUtils.newScheduledThreadPool(corePoolSize, 
newThreadFactory(processName, isDaemon));
+    }
+
+    public static ScheduledExecutorService newScheduledThreadPool(int 
corePoolSize, ThreadFactory threadFactory) {
+        return ThreadUtils.newScheduledThreadPool(corePoolSize, threadFactory, 
new ThreadPoolExecutor.AbortPolicy());
+    }
+
+    public static ScheduledExecutorService newScheduledThreadPool(int 
corePoolSize,
+        ThreadFactory threadFactory,
+        RejectedExecutionHandler handler) {
+        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, 
handler);
     }
 
     public static ThreadFactory newThreadFactory(String processName, boolean 
isDaemon) {
-        return newGenericThreadFactory("Remoting-" + processName, isDaemon);
+        return newGenericThreadFactory("ThreadUtils-" + processName, isDaemon);
     }
 
     public static ThreadFactory newGenericThreadFactory(String processName) {
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index c6446f058f..5b712bc30d 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -47,14 +47,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 public class BrokerContainer implements IBrokerContainer {
     private static final Logger LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
-    private final ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(1,
+    private final ScheduledExecutorService scheduledExecutorService = 
ThreadUtils.newScheduledThreadPool(1,
         new BasicThreadFactory.Builder()
             .namingPattern("BrokerContainerScheduledThread")
             .daemon(true)
@@ -143,7 +141,7 @@ public class BrokerContainer implements IBrokerContainer {
         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, 
this.containerClientHouseKeepingService);
         this.fastRemotingServer = 
this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() - 
2);
 
-        this.brokerContainerExecutor = new ThreadPoolExecutor(
+        this.brokerContainerExecutor = ThreadUtils.newThreadPoolExecutor(
             1,
             1,
             1000 * 60,
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 7c91e70da5..3e6b0eba51 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
@@ -34,8 +32,8 @@ import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.future.FutureTaskExt;
 
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
 import org.apache.rocketmq.controller.impl.DLedgerController;
 import 
org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
@@ -93,18 +91,14 @@ public class ControllerManager {
 
     public boolean initialize() {
         this.controllerRequestThreadPoolQueue = new 
LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
-        this.controllerRequestExecutor = new ThreadPoolExecutor(
+        this.controllerRequestExecutor = ThreadUtils.newThreadPoolExecutor(
             this.controllerConfig.getControllerThreadPoolNums(),
             this.controllerConfig.getControllerThreadPoolNums(),
             1000 * 60,
             TimeUnit.MILLISECONDS,
             this.controllerRequestThreadPoolQueue,
-            new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
-            @Override
-            protected <T> RunnableFuture<T> newTaskFor(final Runnable 
runnable, final T value) {
-                return new FutureTaskExt<T>(runnable, value);
-            }
-        };
+            new ThreadFactoryImpl("ControllerRequestExecutorThread_"));
+
         this.notifyService.initialize();
         if 
(StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
             throw new IllegalArgumentException("Attribute value 
controllerDLegerPeers of ControllerConfig is null or empty");
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index fa91f288e2..33e4406e40 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -44,6 +43,7 @@ import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.controller.Controller;
 import org.apache.rocketmq.controller.elect.ElectPolicy;
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
@@ -66,11 +66,11 @@ import 
org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
@@ -136,7 +136,7 @@ public class DLedgerController implements Controller {
         this.dLedgerServer = new DLedgerServer(dLedgerConfig, 
nettyServerConfig, nettyClientConfig, channelEventListener);
         this.dLedgerServer.registerStateMachine(this.statemachine);
         
this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
-        this.scanInactiveMasterService = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
+        this.scanInactiveMasterService = 
ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
         this.brokerLifecycleListeners = new ArrayList<>();
     }
 
@@ -513,7 +513,7 @@ public class DLedgerController implements Controller {
     class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
 
         private final String selfId;
-        private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(new 
ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
+        private final ExecutorService executorService = 
ThreadUtils.newSingleThreadExecutor(new 
ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
         private volatile MemberState.Role currentRole = 
MemberState.Role.FOLLOWER;
 
         public RoleChangeHandler(final String selfId) {
diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
index 2fbddb9cdf..6ebb2c9942 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ControllerConfig;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.controller.BrokerHeartbeatManager;
 import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class DefaultBrokerHeartbeatManager implements 
BrokerHeartbeatManager {
 
     @Override
     public void initialize() {
-        this.scheduledService = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
+        this.scheduledService = 
ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
         this.executor = Executors.newFixedThreadPool(2, new 
ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
     }
 
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 15c65ebec9..be327cffa5 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -20,10 +20,7 @@ import java.util.Collections;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -31,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
@@ -62,10 +60,10 @@ public class NamesrvController {
     private final NettyServerConfig nettyServerConfig;
     private final NettyClientConfig nettyClientConfig;
 
-    private final ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(1,
+    private final ScheduledExecutorService scheduledExecutorService = 
ThreadUtils.newScheduledThreadPool(1,
             new 
BasicThreadFactory.Builder().namingPattern("NSScheduledThread").daemon(true).build());
 
-    private final ScheduledExecutorService scanExecutorService = new 
ScheduledThreadPoolExecutor(1,
+    private final ScheduledExecutorService scanExecutorService = 
ThreadUtils.newScheduledThreadPool(1,
             new 
BasicThreadFactory.Builder().namingPattern("NSScanScheduledThread").daemon(true).build());
 
     private final KVConfigManager kvConfigManager;
@@ -138,20 +136,10 @@ public class NamesrvController {
 
     private void initiateThreadExecutors() {
         this.defaultThreadPoolQueue = new 
LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());
-        this.defaultExecutor = new 
ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), 
this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, 
TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new 
ThreadFactoryImpl("RemotingExecutorThread_")) {
-            @Override
-            protected <T> RunnableFuture<T> newTaskFor(final Runnable 
runnable, final T value) {
-                return new FutureTaskExt<>(runnable, value);
-            }
-        };
+        this.defaultExecutor = 
ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(),
 this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, 
TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new 
ThreadFactoryImpl("RemotingExecutorThread_"));
 
         this.clientRequestThreadPoolQueue = new 
LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
-        this.clientRequestExecutor = new 
ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), 
this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, 
TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new 
ThreadFactoryImpl("ClientRequestExecutorThread_")) {
-            @Override
-            protected <T> RunnableFuture<T> newTaskFor(final Runnable 
runnable, final T value) {
-                return new FutureTaskExt<>(runnable, value);
-            }
-        };
+        this.clientRequestExecutor = 
ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(),
 this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, 
TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new 
ThreadFactoryImpl("ClientRequestExecutorThread_"));
     }
 
     private void initiateSslContext() {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
index 14330dd8d4..a18cf7600c 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
@@ -21,13 +21,13 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
@@ -43,7 +43,7 @@ public class GrpcChannelManager implements StartAndShutdown {
     protected final AtomicLong nonceIdGenerator = new AtomicLong(0);
     protected final ConcurrentMap<String /* nonce */, ResultFuture> 
resultNonceFutureMap = new ConcurrentHashMap<>();
 
-    protected final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
+    protected final ScheduledExecutorService scheduledExecutorService = 
ThreadUtils.newSingleThreadScheduledExecutor(
         new ThreadFactoryImpl("GrpcChannelManager_")
     );
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index bcc9edd091..fe07090d50 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -178,7 +178,7 @@ public class RemotingProtocolServer implements 
StartAndShutdown, RemotingProxyOu
             new 
ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue())
         );
 
-        this.timerExecutor = Executors.newSingleThreadScheduledExecutor(
+        this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor(
             new 
ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build()
         );
         this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 
10, TimeUnit.SECONDS);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index d2ddfc3527..9786cec557 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.proxy.service;
 
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
@@ -27,23 +26,24 @@ import 
org.apache.rocketmq.broker.client.ProducerChangeListener;
 import org.apache.rocketmq.broker.client.ProducerGroupEvent;
 import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.client.common.NameserverAccessConfig;
+import 
org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.service.admin.AdminService;
 import org.apache.rocketmq.proxy.service.admin.DefaultAdminService;
 import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager;
+import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor;
 import org.apache.rocketmq.proxy.service.message.ClusterMessageService;
 import org.apache.rocketmq.proxy.service.message.MessageService;
 import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
-import 
org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
-import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor;
 import org.apache.rocketmq.proxy.service.relay.ClusterProxyRelayService;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
 import org.apache.rocketmq.proxy.service.route.ClusterTopicRouteService;
@@ -73,7 +73,7 @@ public class ClusterServiceManager extends 
AbstractStartAndShutdown implements S
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         NameserverAccessConfig nameserverAccessConfig = new 
NameserverAccessConfig(proxyConfig.getNamesrvAddr(),
             proxyConfig.getNamesrvDomain(), 
proxyConfig.getNamesrvDomainSubgroup());
-        this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
+        this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(3);
 
         this.messagingClientAPIFactory = new MQClientAPIFactory(
             nameserverAccessConfig,
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
index 4d1ca7b669..59cd92685a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.proxy.service;
 
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
@@ -28,6 +27,7 @@ import 
org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.service.admin.AdminService;
@@ -58,7 +58,7 @@ public class LocalServiceManager extends 
AbstractStartAndShutdown implements Ser
     private final MQClientAPIFactory mqClientAPIFactory;
     private final ChannelManager channelManager;
 
-    private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
+    private final ScheduledExecutorService scheduledExecutorService = 
ThreadUtils.newSingleThreadScheduledExecutor(
         new ThreadFactoryImpl("LocalServiceManagerScheduledThread"));
 
     public LocalServiceManager(BrokerController brokerController, RPCHook 
rpcHook) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 69f44344a0..207603fe81 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -42,20 +41,21 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
+import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -68,7 +68,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
     protected final StateEventListener<RenewEvent> eventListener;
     protected final static RetryPolicy RENEW_POLICY = new 
RenewStrategyPolicy();
     protected final ScheduledExecutorService scheduledExecutorService =
-        Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RenewalScheduledThread_"));
+        ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RenewalScheduledThread_"));
     protected final ThreadPoolExecutor renewalWorkerService;
 
     public DefaultReceiptHandleManager(MetadataService metadataService, 
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index caf62a1e02..ccf094c03a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -19,25 +19,24 @@ package org.apache.rocketmq.proxy.service.route;
 import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.base.Optional;
 import java.time.Duration;
 import java.util.List;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Optional;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.client.latency.MQFaultStrategy;
 import org.apache.rocketmq.client.latency.Resolver;
 import org.apache.rocketmq.client.latency.ServiceDetector;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.Address;
@@ -63,7 +62,7 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
     public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
 
-        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(
+        this.scheduledExecutorService = 
ThreadUtils.newSingleThreadScheduledExecutor(
             new ThreadFactoryImpl("TopicRouteService_")
         );
         this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 8491f4354c..64621dd6c4 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +70,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -142,7 +142,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
 
         this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, 
new ThreadFactoryImpl("NettyClientPublicExecutor_"));
 
-        this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+        this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, 
TimeUnit.SECONDS,
             new ArrayBlockingQueue<>(32), new 
ThreadFactoryImpl("NettyClientScan_thread_"));
 
         if (eventLoopGroup != null) {
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index e626260c93..aa0d46542b 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.constant.HAProxyConstants;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.BinaryUtil;
 import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -83,7 +84,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -171,7 +171,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     }
 
     private ScheduledExecutorService buildScheduleExecutor() {
-        return new ScheduledThreadPoolExecutor(1,
+        return ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("NettyServerScheduler_", true),
             new ThreadPoolExecutor.DiscardOldestPolicy());
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index f2a54ddf69..02ea47f13a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -83,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -205,7 +205,7 @@ public class DefaultMessageStore implements MessageStore {
     private ConcurrentMap<String, TopicConfig> topicConfigTable;
 
     private final ScheduledExecutorService scheduledCleanQueueExecutorService =
-        Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
+        ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
 
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, 
final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> 
topicConfigTable) throws IOException {
@@ -253,7 +253,7 @@ public class DefaultMessageStore implements MessageStore {
         this.transientStorePool = new 
TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), 
messageStoreConfig.getMappedFileSizeCommitLog());
 
         this.scheduledExecutorService =
-            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
+            ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
 
         this.dispatcherList = new LinkedList<>();
         this.dispatcherList.addLast(new 
CommitLogDispatcherBuildConsumeQueue());
@@ -2915,7 +2915,7 @@ public class DefaultMessageStore implements MessageStore {
         private final ExecutorService batchDispatchRequestExecutor;
 
         public MainBatchDispatchRequestService() {
-            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+            batchDispatchRequestExecutor = ThreadUtils.newThreadPoolExecutor(
                     
DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(),
                     
DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(),
                     1000 * 60,
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index d5393fdca4..f20bc3e280 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -17,10 +17,26 @@
 
 package org.apache.rocketmq.store.ha.autoswitch;
 
-
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.EpochEntry;
@@ -36,30 +52,12 @@ import org.apache.rocketmq.store.ha.HAClient;
 import org.apache.rocketmq.store.ha.HAConnection;
 import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
 
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
 /**
  * SwitchAble ha service, support switch role to master or slave.
  */
 public class AutoSwitchHAService extends DefaultHAService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-    private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(new 
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
+    private final ExecutorService executorService = 
ThreadUtils.newSingleThreadExecutor(new 
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
     private final ConcurrentHashMap<Long/*brokerId*/, 
Long/*lastCaughtUpTimestamp*/> connectionCaughtUpTimeTable = new 
ConcurrentHashMap<>();
     private final List<Consumer<Set<Long/*brokerId*/>>> 
syncStateSetChangedListeners = new ArrayList<>();
     private final Set<Long/*brokerId*/> syncStateSet = new HashSet<>();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java 
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index b37c907267..639084fa2d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -16,17 +16,25 @@
  */
 package org.apache.rocketmq.store.kv;
 
-import java.util.Random;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.attribute.CleanupPolicy;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -35,15 +43,6 @@ import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 public class CompactionStore {
 
     public static final String COMPACTION_DIR = "compaction";
@@ -76,7 +75,7 @@ public class CompactionStore {
         this.positionMgr = new CompactionPositionMgr(compactionPath);
         this.compactionThreadNum = 
Math.min(Runtime.getRuntime().availableProcessors(), Math.max(1, 
config.getCompactionThreadNum()));
 
-        this.compactionSchedule = 
Executors.newScheduledThreadPool(this.compactionThreadNum,
+        this.compactionSchedule = 
ThreadUtils.newScheduledThreadPool(this.compactionThreadNum,
             new ThreadFactoryImpl("compactionSchedule_"));
         this.offsetMapSize = config.getMaxOffsetMapSize() / 
compactionThreadNum;
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 8d38503b37..d03d15d653 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
@@ -34,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.CommitLog;
@@ -175,7 +175,7 @@ public class ConsumeQueueStore {
     }
 
     private ExecutorService buildExecutorService(BlockingQueue<Runnable> 
blockingQueue, String threadNamePrefix) {
-        return new ThreadPoolExecutor(
+        return ThreadUtils.newThreadPoolExecutor(
             this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
             this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
             1000 * 60,
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 2dd3fc5b52..489d7b4fbc 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.store.stats;
 
 import java.util.HashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -32,13 +31,14 @@ import 
org.apache.rocketmq.common.statistics.StatisticsItemScheduledPrinter;
 import org.apache.rocketmq.common.statistics.StatisticsItemStateGetter;
 import org.apache.rocketmq.common.statistics.StatisticsKindMeta;
 import org.apache.rocketmq.common.statistics.StatisticsManager;
+import org.apache.rocketmq.common.stats.MomentStatsItemSet;
 import org.apache.rocketmq.common.stats.Stats;
+import org.apache.rocketmq.common.stats.StatsItem;
+import org.apache.rocketmq.common.stats.StatsItemSet;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.stats.MomentStatsItemSet;
-import org.apache.rocketmq.common.stats.StatsItem;
-import org.apache.rocketmq.common.stats.StatsItemSet;
 
 public class BrokerStatsManager {
 
@@ -281,11 +281,11 @@ public class BrokerStatsManager {
 
     private void initScheduleService() {
         this.scheduledExecutorService =
-            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig));
+            ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig));
         this.commercialExecutor =
-            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
+            ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
         this.accountExecutor =
-            Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
+            ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
     }
 
     public MomentStatsItemSet getMomentStatsItemSetFallSize() {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 181f7087ae..0d50de65ae 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -35,7 +35,6 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +53,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.ConsumeQueue;
@@ -174,11 +174,11 @@ public class TimerMessageStore {
         this.lastBrokerRole = storeConfig.getBrokerRole();
 
         if (messageStore instanceof DefaultMessageStore) {
-            scheduler = Executors.newSingleThreadScheduledExecutor(
+            scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
                 new ThreadFactoryImpl("TimerScheduledThread",
                     ((DefaultMessageStore) messageStore).getBrokerIdentity()));
         } else {
-            scheduler = Executors.newSingleThreadScheduledExecutor(
+            scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
                 new ThreadFactoryImpl("TimerScheduledThread"));
         }
 
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java 
b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
index f3d105bc6b..080b7e3852 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.annotation.Generated;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 6dd0e8846e..65d586f43d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -20,10 +20,10 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 
 public class TieredStoreExecutor {
 
@@ -43,20 +43,20 @@ public class TieredStoreExecutor {
     public static ExecutorService compactIndexFileExecutor;
 
     public static void init() {
-        commonScheduledExecutor = new ScheduledThreadPoolExecutor(
+        commonScheduledExecutor = ThreadUtils.newScheduledThreadPool(
             Math.max(4, Runtime.getRuntime().availableProcessors()),
             new ThreadFactoryImpl("TieredCommonExecutor_"));
 
-        commitExecutor = new ScheduledThreadPoolExecutor(
+        commitExecutor = ThreadUtils.newScheduledThreadPool(
             Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
             new ThreadFactoryImpl("TieredCommitExecutor_"));
 
-        cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
+        cleanExpiredFileExecutor = ThreadUtils.newScheduledThreadPool(
             Math.max(4, Runtime.getRuntime().availableProcessors()),
             new ThreadFactoryImpl("TieredCleanFileExecutor_"));
 
         dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
-        dispatchExecutor = new ThreadPoolExecutor(
+        dispatchExecutor = ThreadUtils.newThreadPoolExecutor(
             Math.max(2, Runtime.getRuntime().availableProcessors()),
             Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
             1000 * 60,
@@ -66,7 +66,7 @@ public class TieredStoreExecutor {
             new ThreadPoolExecutor.DiscardOldestPolicy());
 
         fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
-        fetchDataExecutor = new ThreadPoolExecutor(
+        fetchDataExecutor = ThreadUtils.newThreadPoolExecutor(
             Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
             Math.max(64, Runtime.getRuntime().availableProcessors() * 8),
             1000 * 60,
@@ -75,7 +75,7 @@ public class TieredStoreExecutor {
             new ThreadFactoryImpl("TieredFetchExecutor_"));
 
         compactIndexFileThreadPoolQueue = new 
LinkedBlockingQueue<>(QUEUE_CAPACITY);
-        compactIndexFileExecutor = new ThreadPoolExecutor(
+        compactIndexFileExecutor = ThreadUtils.newThreadPoolExecutor(
             1,
             1,
             1000 * 60,
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index fa3596d51c..1ebff6d8af 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -66,6 +66,7 @@ import org.apache.rocketmq.common.namesrv.NamesrvUtil;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -193,7 +194,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
 
                 int threadPoolCoreSize = 
Integer.parseInt(System.getProperty("rocketmq.admin.threadpool.coresize", 
"20"));
 
-                this.threadPoolExecutor = new 
ThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new 
LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
+                this.threadPoolExecutor = (ThreadPoolExecutor) 
ThreadUtils.newThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, 
new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
 
                 break;
             case RUNNING:

Reply via email to