This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c100d815d7 [ISSUE #7328] Convergent thread pool creation (#7329)
c100d815d7 is described below
commit c100d815d754d7cb330bc63e145bafd2d9b59cb1
Author: guyinyou <[email protected]>
AuthorDate: Mon Sep 11 10:13:56 2023 +0800
[ISSUE #7328] Convergent thread pool creation (#7329)
* Convergence thread pool creation to facilitate subsequent iteration
management
* Convergence thread pool creation in ThreadPoolMonitor.java
* fix unit test
* Convergence ThreadPool constructor
* Convergence ScheduledThreadPool constructor
* remove unused import
* Convergence ScheduledThreadPool constructor
* remove unused import
---------
---
.../apache/rocketmq/broker/BrokerController.java | 39 ++++++------
.../broker/client/ClientHousekeepingService.java | 4 +-
.../client/DefaultConsumerIdsChangeListener.java | 3 +-
.../broker/controller/ReplicasManager.java | 9 ++-
.../broker/dledger/DLedgerRoleChangeHandler.java | 4 +-
.../rocketmq/broker/failover/EscapeBridge.java | 4 +-
.../rocketmq/broker/latency/BrokerFastFailure.java | 5 +-
.../latency/BrokerFixedThreadPoolExecutor.java | 57 -----------------
.../rocketmq/broker/latency/FutureTaskExt.java | 39 ------------
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 7 +-
.../broker/schedule/ScheduleMessageService.java | 7 +-
.../broker/topic/TopicRouteInfoManager.java | 4 +-
.../AbstractTransactionalMessageCheckListener.java | 4 +-
.../rocketmq/broker/BrokerControllerTest.java | 2 +-
.../broker/latency/BrokerFastFailureTest.java | 1 +
.../common/config/AbstractRocksDBStorage.java | 6 +-
.../thread/FutureTaskExtThreadPoolExecutor.java | 3 +-
.../rocketmq/common/thread/ThreadPoolMonitor.java | 6 +-
.../apache/rocketmq/common/utils/ThreadUtils.java | 74 +++++++++++++++++++---
.../apache/rocketmq/container/BrokerContainer.java | 6 +-
.../rocketmq/controller/ControllerManager.java | 14 ++--
.../controller/impl/DLedgerController.java | 10 +--
.../heartbeat/DefaultBrokerHeartbeatManager.java | 3 +-
.../apache/rocketmq/namesrv/NamesrvController.java | 22 ++-----
.../proxy/grpc/v2/channel/GrpcChannelManager.java | 6 +-
.../proxy/remoting/RemotingProtocolServer.java | 4 +-
.../proxy/service/ClusterServiceManager.java | 12 ++--
.../proxy/service/LocalServiceManager.java | 4 +-
.../receipt/DefaultReceiptHandleManager.java | 8 +--
.../proxy/service/route/TopicRouteService.java | 9 ++-
.../remoting/netty/NettyRemotingClient.java | 4 +-
.../remoting/netty/NettyRemotingServer.java | 4 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 8 +--
.../store/ha/autoswitch/AutoSwitchHAService.java | 38 ++++++-----
.../apache/rocketmq/store/kv/CompactionStore.java | 21 +++---
.../rocketmq/store/queue/ConsumeQueueStore.java | 4 +-
.../rocketmq/store/stats/BrokerStatsManager.java | 14 ++--
.../rocketmq/store/timer/TimerMessageStore.java | 6 +-
.../org/apache/rocketmq/test/util/StatUtil.java | 1 -
.../tieredstore/common/TieredStoreExecutor.java | 14 ++--
.../tools/admin/DefaultMQAdminExtImpl.java | 3 +-
41 files changed, 215 insertions(+), 278 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6aba70cb21..275b64b1ab 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -34,7 +34,6 @@ import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -98,6 +97,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.Configuration;
@@ -160,7 +160,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -455,10 +454,10 @@ public class BrokerController {
* Initialize resources including remoting server and thread executors.
*/
protected void initializeResources() {
- this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerControllerScheduledThread", true,
getBrokerIdentity()));
- this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
@@ -466,7 +465,7 @@ public class BrokerController {
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
- this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
@@ -474,7 +473,7 @@ public class BrokerController {
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));
- this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.litePullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getLitePullMessageThreadPoolNums(),
this.brokerConfig.getLitePullMessageThreadPoolNums(),
1000 * 60,
@@ -482,7 +481,7 @@ public class BrokerController {
this.litePullThreadPoolQueue,
new ThreadFactoryImpl("LitePullMessageThread_",
getBrokerIdentity()));
- this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
+ this.putMessageFutureExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
1000 * 60,
@@ -490,7 +489,7 @@ public class BrokerController {
this.putThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
- this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.ackMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getAckMessageThreadPoolNums(),
this.brokerConfig.getAckMessageThreadPoolNums(),
1000 * 60,
@@ -498,7 +497,7 @@ public class BrokerController {
this.ackThreadPoolQueue,
new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));
- this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.queryMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
@@ -506,7 +505,7 @@ public class BrokerController {
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));
- this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor(
+ this.adminBrokerExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getAdminBrokerThreadPoolNums(),
this.brokerConfig.getAdminBrokerThreadPoolNums(),
1000 * 60,
@@ -514,7 +513,7 @@ public class BrokerController {
this.adminBrokerThreadPoolQueue,
new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));
- this.clientManageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.clientManageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
@@ -522,7 +521,7 @@ public class BrokerController {
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));
- this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
+ this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
@@ -530,7 +529,7 @@ public class BrokerController {
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true,
getBrokerIdentity()));
- this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getConsumerManageThreadPoolNums(),
this.brokerConfig.getConsumerManageThreadPoolNums(),
1000 * 60,
@@ -538,7 +537,7 @@ public class BrokerController {
this.consumerManagerThreadPoolQueue,
new ThreadFactoryImpl("ConsumerManageThread_", true,
getBrokerIdentity()));
- this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.replyMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
@@ -546,7 +545,7 @@ public class BrokerController {
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_",
getBrokerIdentity()));
- this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
+ this.endTransactionExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
@@ -554,7 +553,7 @@ public class BrokerController {
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_",
getBrokerIdentity()));
- this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor(
+ this.loadBalanceExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
1000 * 60,
@@ -562,9 +561,9 @@ public class BrokerController {
this.loadBalanceThreadPoolQueue,
new ThreadFactoryImpl("LoadBalanceProcessorThread_",
getBrokerIdentity()));
- this.syncBrokerMemberGroupExecutorService = new
ScheduledThreadPoolExecutor(1,
+ this.syncBrokerMemberGroupExecutorService =
ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread",
getBrokerIdentity()));
- this.brokerHeartbeatExecutorService = new
ScheduledThreadPoolExecutor(1,
+ this.brokerHeartbeatExecutorService =
ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerControllerHeartbeatScheduledThread",
getBrokerIdentity()));
this.topicQueueMappingCleanService = new
TopicQueueMappingCleanService(this);
@@ -828,8 +827,6 @@ public class BrokerController {
initializeResources();
- registerProcessor();
-
initializeScheduledTasks();
initialTransaction();
@@ -1690,6 +1687,8 @@ public class BrokerController {
}
}
}, 10, 5, TimeUnit.SECONDS);
+
+ registerProcessor();
}
protected void scheduleSendHeartbeat() {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index 98e5f450f3..cbb81f632b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -18,11 +18,11 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -35,7 +35,7 @@ public class ClientHousekeepingService implements
ChannelEventListener {
public ClientHousekeepingService(final BrokerController brokerController) {
this.brokerController = brokerController;
- scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("ClientHousekeepingScheduledThread",
brokerController.getBrokerIdentity()));
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index 2ce036a0ff..d17a2a5470 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -37,7 +36,7 @@ public class DefaultConsumerIdsChangeListener implements
ConsumerIdsChangeListen
private final BrokerController brokerController;
private final int cacheSize = 8096;
- private final ScheduledExecutorService scheduledExecutorService = new
ScheduledThreadPoolExecutor(1,
+ private final ScheduledExecutorService scheduledExecutorService =
ThreadUtils.newScheduledThreadPool(1,
ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true));
private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new
ConcurrentHashMap<>(cacheSize);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 37c82e434b..a989e6e68f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -27,10 +27,8 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
@@ -42,6 +40,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
@@ -107,9 +106,9 @@ public class ReplicasManager {
public ReplicasManager(final BrokerController brokerController) {
this.brokerController = brokerController;
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
- this.scheduledService = Executors.newScheduledThreadPool(3, new
ThreadFactoryImpl("ReplicasManager_ScheduledService_",
brokerController.getBrokerIdentity()));
- this.executorService = Executors.newFixedThreadPool(3, new
ThreadFactoryImpl("ReplicasManager_ExecutorService_",
brokerController.getBrokerIdentity()));
- this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+ this.scheduledService = ThreadUtils.newScheduledThreadPool(3, new
ThreadFactoryImpl("ReplicasManager_ScheduledService_",
brokerController.getBrokerIdentity()));
+ this.executorService = ThreadUtils.newThreadPoolExecutor(3, new
ThreadFactoryImpl("ReplicasManager_ExecutorService_",
brokerController.getBrokerIdentity()));
+ this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(32), new
ThreadFactoryImpl("ReplicasManager_scan_thread_",
brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService)
brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
index 75023ee1b8..e6cb97640b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
@@ -21,12 +21,12 @@ import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -49,7 +49,7 @@ public class DLedgerRoleChangeHandler implements
DLedgerLeaderElector.RoleChange
this.messageStore = messageStore;
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
- this.executorService = Executors.newSingleThreadExecutor(
+ this.executorService = ThreadUtils.newSingleThreadExecutor(
new ThreadFactoryImpl("DLegerRoleChangeHandler_",
brokerController.getBrokerIdentity()));
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 7c350fc1d7..6a08174801 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
@@ -43,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -72,7 +72,7 @@ public class EscapeBridge {
public void start() throws Exception {
if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() &&
brokerController.getBrokerConfig().isEnableRemoteEscape()) {
final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new
LinkedBlockingQueue<>(50000);
- this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
+ this.defaultAsyncSenderExecutor =
ThreadUtils.newThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index d3d0bc8ba3..3b6e9dc676 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -18,13 +18,14 @@ package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.netty.RequestTask;
@@ -43,7 +44,7 @@ public class BrokerFastFailure {
public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
- this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
brokerController == null ? null :
brokerController.getBrokerConfig()));
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
deleted file mode 100644
index d2d1143a34..0000000000
---
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.latency;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int
maximumPoolSize, final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int
maximumPoolSize, final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue, final ThreadFactory
threadFactory) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int
maximumPoolSize, final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue, final
RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
handler);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int
maximumPoolSize, final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue, final ThreadFactory
threadFactory,
- final RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
- }
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final
T value) {
- return new FutureTaskExt<>(runnable, value);
- }
-}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
deleted file mode 100644
index f132efaebc..0000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.latency;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-public class FutureTaskExt<V> extends FutureTask<V> {
- private final Runnable runnable;
-
- public FutureTaskExt(final Callable<V> callable) {
- super(callable);
- this.runnable = null;
- }
-
- public FutureTaskExt(final Runnable runnable, final V result) {
- super(runnable, result);
- this.runnable = runnable;
- }
-
- public Runnable getRunnable() {
- return runnable;
- }
-}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index ae81e8b11d..9dfb8127d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -27,9 +27,9 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -59,6 +59,7 @@ import
org.apache.rocketmq.common.namesrv.DefaultTopAddressing;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
@@ -144,7 +145,7 @@ public class BrokerOuterAPI {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
private final TopAddressing topAddressing = new
DefaultTopAddressing(MixAll.getWSAddr());
- private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
+ private final ExecutorService brokerOuterExecutor =
ThreadUtils.newThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(32), new
ThreadFactoryImpl("brokerOutApi_thread_", true));
private final ClientMetadata clientMetadata;
private final RpcClient rpcClient;
@@ -1092,7 +1093,7 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public BrokerFixedThreadPoolExecutor getBrokerOuterExecutor() {
+ public ExecutorService getBrokerOuterExecutor() {
return brokerOuterExecutor;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 297b14207c..0c2e6507bd 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager {
public ScheduleMessageService(final BrokerController brokerController) {
this.brokerController = brokerController;
this.enableAsyncDeliver =
brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
- scheduledPersistService = new ScheduledThreadPoolExecutor(1,
+ scheduledPersistService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true,
brokerController.getBrokerConfig()));
}
@@ -134,9 +133,9 @@ public class ScheduleMessageService extends ConfigManager {
public void start() {
if (started.compareAndSet(false, true)) {
this.load();
- this.deliverExecutorService = new
ScheduledThreadPoolExecutor(this.maxDelayLevel, new
ThreadFactoryImpl("ScheduleMessageTimerThread_"));
+ this.deliverExecutorService =
ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new
ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
- this.handleExecutorService = new
ScheduledThreadPoolExecutor(this.maxDelayLevel, new
ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
+ this.handleExecutorService =
ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new
ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
for (Map.Entry<Integer, Long> entry :
this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
index b355647255..11bde5f5fe 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -23,7 +23,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -66,7 +66,7 @@ public class TopicRouteInfoManager {
}
public void start() {
- this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
+ this.scheduledExecutorService =
ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 771d843006..982355d783 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.transaction;
import io.netty.channel.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
@@ -27,6 +26,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import
org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
@@ -97,7 +97,7 @@ public abstract class
AbstractTransactionalMessageCheckListener {
public synchronized void initExecutorService() {
if (executorService == null) {
- executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
+ executorService = ThreadUtils.newThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
new ThreadFactoryImpl("Transaction-msg-check-thread",
brokerController.getBrokerIdentity()), new CallerRunsPolicy());
}
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 75ad961ce9..6035a20acb 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -23,9 +23,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.latency.FutureTaskExt;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
index 5d0f7f9d72..31b547cf1b 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.junit.Test;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index a720a5be32..6f19a9815d 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import com.google.common.collect.Maps;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.rocksdb.ColumnFamilyDescriptor;
@@ -82,8 +82,8 @@ public abstract class AbstractRocksDBStorage {
private volatile boolean closed;
private final Semaphore reloadPermit = new Semaphore(1);
- private final ScheduledExecutorService reloadScheduler = new
ScheduledThreadPoolExecutor(1, new
ThreadFactoryImpl("RocksDBStorageReloadService_"));
- private final ThreadPoolExecutor manualCompactionThread = new
ThreadPoolExecutor(
+ private final ScheduledExecutorService reloadScheduler =
ThreadUtils.newScheduledThreadPool(1, new
ThreadFactoryImpl("RocksDBStorageReloadService_"));
+ private final ThreadPoolExecutor manualCompactionThread =
(ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
1, 1, 1000 * 60, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue(1),
new ThreadFactoryImpl("RocksDBManualCompactionService_"),
diff --git
a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
index 411da92219..7b68873a99 100644
---
a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
+++
b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
@@ -29,7 +29,8 @@ public class FutureTaskExtThreadPoolExecutor extends
ThreadPoolExecutor {
public FutureTaskExtThreadPoolExecutor(int corePoolSize, int
maximumPoolSize, long keepAliveTime,
TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
index 49d97a5d72..1bfabbffed 100644
---
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
+++
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
@@ -22,12 +22,12 @@ import
com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,7 +36,7 @@ public class ThreadPoolMonitor {
private static Logger waterMarkLogger =
LoggerFactory.getLogger(ThreadPoolMonitor.class);
private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new
CopyOnWriteArrayList<>();
- private static final ScheduledExecutorService MONITOR_SCHEDULED =
Executors.newSingleThreadScheduledExecutor(
+ private static final ScheduledExecutorService MONITOR_SCHEDULED =
ThreadUtils.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
);
@@ -81,7 +81,7 @@ public class ThreadPoolMonitor {
String name,
int queueCapacity,
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
- ThreadPoolExecutor executor = new FutureTaskExtThreadPoolExecutor(
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
ThreadUtils.newThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
diff --git
a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
index 4b366d4e39..1644c6360e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
@@ -20,38 +20,94 @@ package org.apache.rocketmq.common.utils;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.thread.FutureTaskExtThreadPoolExecutor;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public final class ThreadUtils {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
- public static ExecutorService newThreadPoolExecutor(int corePoolSize, int
maximumPoolSize, long keepAliveTime,
- TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName,
boolean isDaemon) {
- return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+ public static ExecutorService newSingleThreadExecutor(String processName,
boolean isDaemon) {
+ return
ThreadUtils.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
}
- public static ExecutorService newSingleThreadExecutor(String processName,
boolean isDaemon) {
- return Executors.newSingleThreadExecutor(newThreadFactory(processName,
isDaemon));
+ public static ExecutorService newSingleThreadExecutor(ThreadFactory
threadFactory) {
+ return ThreadUtils.newThreadPoolExecutor(1, threadFactory);
+ }
+
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize, corePoolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ threadFactory);
+ }
+
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit, BlockingQueue<Runnable> workQueue,
+ String processName,
+ boolean isDaemon) {
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName,
isDaemon));
+ }
+
+ public static ExecutorService newThreadPoolExecutor(final int corePoolSize,
+ final int maximumPoolSize,
+ final long keepAliveTime,
+ final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue,
+ final ThreadFactory threadFactory) {
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new
ThreadPoolExecutor.AbortPolicy());
+ }
+
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ return new FutureTaskExtThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public static ScheduledExecutorService
newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
- return
Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName,
isDaemon));
+ return ThreadUtils.newScheduledThreadPool(1, processName, isDaemon);
+ }
+
+ public static ScheduledExecutorService
newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
+ return ThreadUtils.newScheduledThreadPool(1, threadFactory);
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(int
corePoolSize) {
+ return ThreadUtils.newScheduledThreadPool(corePoolSize,
Executors.defaultThreadFactory());
}
- public static ScheduledExecutorService newFixedThreadScheduledPool(int
nThreads, String processName,
+ public static ScheduledExecutorService newScheduledThreadPool(int
corePoolSize, String processName,
boolean isDaemon) {
- return Executors.newScheduledThreadPool(nThreads,
newThreadFactory(processName, isDaemon));
+ return ThreadUtils.newScheduledThreadPool(corePoolSize,
newThreadFactory(processName, isDaemon));
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(int
corePoolSize, ThreadFactory threadFactory) {
+ return ThreadUtils.newScheduledThreadPool(corePoolSize, threadFactory,
new ThreadPoolExecutor.AbortPolicy());
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(int
corePoolSize,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory,
handler);
}
public static ThreadFactory newThreadFactory(String processName, boolean
isDaemon) {
- return newGenericThreadFactory("Remoting-" + processName, isDaemon);
+ return newGenericThreadFactory("ThreadUtils-" + processName, isDaemon);
}
public static ThreadFactory newGenericThreadFactory(String processName) {
diff --git
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index c6446f058f..5b712bc30d 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -47,14 +47,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BrokerContainer implements IBrokerContainer {
private static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ScheduledExecutorService scheduledExecutorService = new
ScheduledThreadPoolExecutor(1,
+ private final ScheduledExecutorService scheduledExecutorService =
ThreadUtils.newScheduledThreadPool(1,
new BasicThreadFactory.Builder()
.namingPattern("BrokerContainerScheduledThread")
.daemon(true)
@@ -143,7 +141,7 @@ public class BrokerContainer implements IBrokerContainer {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
this.containerClientHouseKeepingService);
this.fastRemotingServer =
this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() -
2);
- this.brokerContainerExecutor = new ThreadPoolExecutor(
+ this.brokerContainerExecutor = ThreadUtils.newThreadPoolExecutor(
1,
1,
1000 * 60,
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 7c91e70da5..3e6b0eba51 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
@@ -34,8 +32,8 @@ import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import
org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
@@ -93,18 +91,14 @@ public class ControllerManager {
public boolean initialize() {
this.controllerRequestThreadPoolQueue = new
LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
- this.controllerRequestExecutor = new ThreadPoolExecutor(
+ this.controllerRequestExecutor = ThreadUtils.newThreadPoolExecutor(
this.controllerConfig.getControllerThreadPoolNums(),
this.controllerConfig.getControllerThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.controllerRequestThreadPoolQueue,
- new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable
runnable, final T value) {
- return new FutureTaskExt<T>(runnable, value);
- }
- };
+ new ThreadFactoryImpl("ControllerRequestExecutorThread_"));
+
this.notifyService.initialize();
if
(StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
throw new IllegalArgumentException("Attribute value
controllerDLegerPeers of ControllerConfig is null or empty");
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index fa91f288e2..33e4406e40 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -44,6 +43,7 @@ import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
@@ -66,11 +66,11 @@ import
org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
-import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
-import
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
@@ -136,7 +136,7 @@ public class DLedgerController implements Controller {
this.dLedgerServer = new DLedgerServer(dLedgerConfig,
nettyServerConfig, nettyClientConfig, channelEventListener);
this.dLedgerServer.registerStateMachine(this.statemachine);
this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
- this.scanInactiveMasterService =
Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
+ this.scanInactiveMasterService =
ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
this.brokerLifecycleListeners = new ArrayList<>();
}
@@ -513,7 +513,7 @@ public class DLedgerController implements Controller {
class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
private final String selfId;
- private final ExecutorService executorService =
Executors.newSingleThreadExecutor(new
ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
+ private final ExecutorService executorService =
ThreadUtils.newSingleThreadExecutor(new
ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
private volatile MemberState.Role currentRole =
MemberState.Role.FOLLOWER;
public RoleChangeHandler(final String selfId) {
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
index 2fbddb9cdf..6ebb2c9942 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class DefaultBrokerHeartbeatManager implements
BrokerHeartbeatManager {
@Override
public void initialize() {
- this.scheduledService = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
+ this.scheduledService =
ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
this.executor = Executors.newFixedThreadPool(2, new
ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 15c65ebec9..be327cffa5 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -20,10 +20,7 @@ import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -31,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
@@ -62,10 +60,10 @@ public class NamesrvController {
private final NettyServerConfig nettyServerConfig;
private final NettyClientConfig nettyClientConfig;
- private final ScheduledExecutorService scheduledExecutorService = new
ScheduledThreadPoolExecutor(1,
+ private final ScheduledExecutorService scheduledExecutorService =
ThreadUtils.newScheduledThreadPool(1,
new
BasicThreadFactory.Builder().namingPattern("NSScheduledThread").daemon(true).build());
- private final ScheduledExecutorService scanExecutorService = new
ScheduledThreadPoolExecutor(1,
+ private final ScheduledExecutorService scanExecutorService =
ThreadUtils.newScheduledThreadPool(1,
new
BasicThreadFactory.Builder().namingPattern("NSScanScheduledThread").daemon(true).build());
private final KVConfigManager kvConfigManager;
@@ -138,20 +136,10 @@ public class NamesrvController {
private void initiateThreadExecutors() {
this.defaultThreadPoolQueue = new
LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());
- this.defaultExecutor = new
ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(),
this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60,
TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new
ThreadFactoryImpl("RemotingExecutorThread_")) {
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable
runnable, final T value) {
- return new FutureTaskExt<>(runnable, value);
- }
- };
+ this.defaultExecutor =
ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(),
this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60,
TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new
ThreadFactoryImpl("RemotingExecutorThread_"));
this.clientRequestThreadPoolQueue = new
LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
- this.clientRequestExecutor = new
ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(),
this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60,
TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new
ThreadFactoryImpl("ClientRequestExecutorThread_")) {
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable
runnable, final T value) {
- return new FutureTaskExt<>(runnable, value);
- }
- };
+ this.clientRequestExecutor =
ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(),
this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60,
TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new
ThreadFactoryImpl("ClientRequestExecutorThread_"));
}
private void initiateSslContext() {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
index 14330dd8d4..a18cf7600c 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
@@ -21,13 +21,13 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
@@ -43,7 +43,7 @@ public class GrpcChannelManager implements StartAndShutdown {
protected final AtomicLong nonceIdGenerator = new AtomicLong(0);
protected final ConcurrentMap<String /* nonce */, ResultFuture>
resultNonceFutureMap = new ConcurrentHashMap<>();
- protected final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
+ protected final ScheduledExecutorService scheduledExecutorService =
ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("GrpcChannelManager_")
);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index bcc9edd091..fe07090d50 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -178,7 +178,7 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
new
ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue())
);
- this.timerExecutor = Executors.newSingleThreadScheduledExecutor(
+ this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build()
);
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10,
10, TimeUnit.SECONDS);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index d2ddfc3527..9786cec557 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.proxy.service;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
@@ -27,23 +26,24 @@ import
org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.client.common.NameserverAccessConfig;
+import
org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.admin.DefaultAdminService;
import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager;
+import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor;
import org.apache.rocketmq.proxy.service.message.ClusterMessageService;
import org.apache.rocketmq.proxy.service.message.MessageService;
import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
-import
org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
-import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor;
import org.apache.rocketmq.proxy.service.relay.ClusterProxyRelayService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.ClusterTopicRouteService;
@@ -73,7 +73,7 @@ public class ClusterServiceManager extends
AbstractStartAndShutdown implements S
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
NameserverAccessConfig nameserverAccessConfig = new
NameserverAccessConfig(proxyConfig.getNamesrvAddr(),
proxyConfig.getNamesrvDomain(),
proxyConfig.getNamesrvDomainSubgroup());
- this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(3);
this.messagingClientAPIFactory = new MQClientAPIFactory(
nameserverAccessConfig,
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
index 4d1ca7b669..59cd92685a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.proxy.service;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
@@ -28,6 +27,7 @@ import
org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.admin.AdminService;
@@ -58,7 +58,7 @@ public class LocalServiceManager extends
AbstractStartAndShutdown implements Ser
private final MQClientAPIFactory mqClientAPIFactory;
private final ChannelManager channelManager;
- private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
+ private final ScheduledExecutorService scheduledExecutorService =
ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("LocalServiceManagerScheduledThread"));
public LocalServiceManager(BrokerController brokerController, RPCHook
rpcHook) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 69f44344a0..207603fe81 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -24,7 +24,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -42,20 +41,21 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
+import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -68,7 +68,7 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
protected final StateEventListener<RenewEvent> eventListener;
protected final static RetryPolicy RENEW_POLICY = new
RenewStrategyPolicy();
protected final ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("RenewalScheduledThread_"));
+ ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("RenewalScheduledThread_"));
protected final ThreadPoolExecutor renewalWorkerService;
public DefaultReceiptHandleManager(MetadataService metadataService,
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index caf62a1e02..ccf094c03a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -19,25 +19,24 @@ package org.apache.rocketmq.proxy.service.route;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.base.Optional;
import java.time.Duration;
import java.util.List;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Optional;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.latency.Resolver;
import org.apache.rocketmq.client.latency.ServiceDetector;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.Address;
@@ -63,7 +62,7 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
+ this.scheduledExecutorService =
ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("TopicRouteService_")
);
this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 8491f4354c..64621dd6c4 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +70,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -142,7 +142,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums,
new ThreadFactoryImpl("NettyClientPublicExecutor_"));
- this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+ this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(32), new
ThreadFactoryImpl("NettyClientScan_thread_"));
if (eventLoopGroup != null) {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index e626260c93..aa0d46542b 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.BinaryUtil;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -83,7 +84,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -171,7 +171,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
private ScheduledExecutorService buildScheduleExecutor() {
- return new ScheduledThreadPoolExecutor(1,
+ return ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("NettyServerScheduler_", true),
new ThreadPoolExecutor.DiscardOldestPolicy());
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index f2a54ddf69..02ea47f13a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -83,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -205,7 +205,7 @@ public class DefaultMessageStore implements MessageStore {
private ConcurrentMap<String, TopicConfig> topicConfigTable;
private final ScheduledExecutorService scheduledCleanQueueExecutorService =
- Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
+ ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig,
final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig>
topicConfigTable) throws IOException {
@@ -253,7 +253,7 @@ public class DefaultMessageStore implements MessageStore {
this.transientStorePool = new
TransientStorePool(messageStoreConfig.getTransientStorePoolSize(),
messageStoreConfig.getMappedFileSizeCommitLog());
this.scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
+ ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new
CommitLogDispatcherBuildConsumeQueue());
@@ -2915,7 +2915,7 @@ public class DefaultMessageStore implements MessageStore {
private final ExecutorService batchDispatchRequestExecutor;
public MainBatchDispatchRequestService() {
- batchDispatchRequestExecutor = new ThreadPoolExecutor(
+ batchDispatchRequestExecutor = ThreadUtils.newThreadPoolExecutor(
DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(),
DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(),
1000 * 60,
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index d5393fdca4..f20bc3e280 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -17,10 +17,26 @@
package org.apache.rocketmq.store.ha.autoswitch;
-
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
@@ -36,30 +52,12 @@ import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnection;
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
/**
* SwitchAble ha service, support switch role to master or slave.
*/
public class AutoSwitchHAService extends DefaultHAService {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- private final ExecutorService executorService =
Executors.newSingleThreadExecutor(new
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
+ private final ExecutorService executorService =
ThreadUtils.newSingleThreadExecutor(new
ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
private final ConcurrentHashMap<Long/*brokerId*/,
Long/*lastCaughtUpTimestamp*/> connectionCaughtUpTimeTable = new
ConcurrentHashMap<>();
private final List<Consumer<Set<Long/*brokerId*/>>>
syncStateSetChangedListeners = new ArrayList<>();
private final Set<Long/*brokerId*/> syncStateSet = new HashSet<>();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index b37c907267..639084fa2d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -16,17 +16,25 @@
*/
package org.apache.rocketmq.store.kv;
-import java.util.Random;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.CleanupPolicy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -35,15 +43,6 @@ import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
public class CompactionStore {
public static final String COMPACTION_DIR = "compaction";
@@ -76,7 +75,7 @@ public class CompactionStore {
this.positionMgr = new CompactionPositionMgr(compactionPath);
this.compactionThreadNum =
Math.min(Runtime.getRuntime().availableProcessors(), Math.max(1,
config.getCompactionThreadNum()));
- this.compactionSchedule =
Executors.newScheduledThreadPool(this.compactionThreadNum,
+ this.compactionSchedule =
ThreadUtils.newScheduledThreadPool(this.compactionThreadNum,
new ThreadFactoryImpl("compactionSchedule_"));
this.offsetMapSize = config.getMaxOffsetMapSize() /
compactionThreadNum;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 8d38503b37..d03d15d653 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
@@ -34,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.CommitLog;
@@ -175,7 +175,7 @@ public class ConsumeQueueStore {
}
private ExecutorService buildExecutorService(BlockingQueue<Runnable>
blockingQueue, String threadNamePrefix) {
- return new ThreadPoolExecutor(
+ return ThreadUtils.newThreadPoolExecutor(
this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
1000 * 60,
diff --git
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 2dd3fc5b52..489d7b4fbc 100644
---
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.store.stats;
import java.util.HashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BrokerConfig;
@@ -32,13 +31,14 @@ import
org.apache.rocketmq.common.statistics.StatisticsItemScheduledPrinter;
import org.apache.rocketmq.common.statistics.StatisticsItemStateGetter;
import org.apache.rocketmq.common.statistics.StatisticsKindMeta;
import org.apache.rocketmq.common.statistics.StatisticsManager;
+import org.apache.rocketmq.common.stats.MomentStatsItemSet;
import org.apache.rocketmq.common.stats.Stats;
+import org.apache.rocketmq.common.stats.StatsItem;
+import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.stats.MomentStatsItemSet;
-import org.apache.rocketmq.common.stats.StatsItem;
-import org.apache.rocketmq.common.stats.StatsItemSet;
public class BrokerStatsManager {
@@ -281,11 +281,11 @@ public class BrokerStatsManager {
private void initScheduleService() {
this.scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig));
+ ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig));
this.commercialExecutor =
- Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
+ ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
this.accountExecutor =
- Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
+ ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
}
public MomentStatsItemSet getMomentStatsItemSetFallSize() {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 181f7087ae..0d50de65ae 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -35,7 +35,6 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -54,6 +53,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.ConsumeQueue;
@@ -174,11 +174,11 @@ public class TimerMessageStore {
this.lastBrokerRole = storeConfig.getBrokerRole();
if (messageStore instanceof DefaultMessageStore) {
- scheduler = Executors.newSingleThreadScheduledExecutor(
+ scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("TimerScheduledThread",
((DefaultMessageStore) messageStore).getBrokerIdentity()));
} else {
- scheduler = Executors.newSingleThreadScheduledExecutor(
+ scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("TimerScheduledThread"));
}
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
index f3d105bc6b..080b7e3852 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.annotation.Generated;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 6dd0e8846e..65d586f43d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -20,10 +20,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.utils.ThreadUtils;
public class TieredStoreExecutor {
@@ -43,20 +43,20 @@ public class TieredStoreExecutor {
public static ExecutorService compactIndexFileExecutor;
public static void init() {
- commonScheduledExecutor = new ScheduledThreadPoolExecutor(
+ commonScheduledExecutor = ThreadUtils.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()),
new ThreadFactoryImpl("TieredCommonExecutor_"));
- commitExecutor = new ScheduledThreadPoolExecutor(
+ commitExecutor = ThreadUtils.newScheduledThreadPool(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
new ThreadFactoryImpl("TieredCommitExecutor_"));
- cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
+ cleanExpiredFileExecutor = ThreadUtils.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()),
new ThreadFactoryImpl("TieredCleanFileExecutor_"));
dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
- dispatchExecutor = new ThreadPoolExecutor(
+ dispatchExecutor = ThreadUtils.newThreadPoolExecutor(
Math.max(2, Runtime.getRuntime().availableProcessors()),
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
1000 * 60,
@@ -66,7 +66,7 @@ public class TieredStoreExecutor {
new ThreadPoolExecutor.DiscardOldestPolicy());
fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
- fetchDataExecutor = new ThreadPoolExecutor(
+ fetchDataExecutor = ThreadUtils.newThreadPoolExecutor(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
Math.max(64, Runtime.getRuntime().availableProcessors() * 8),
1000 * 60,
@@ -75,7 +75,7 @@ public class TieredStoreExecutor {
new ThreadFactoryImpl("TieredFetchExecutor_"));
compactIndexFileThreadPoolQueue = new
LinkedBlockingQueue<>(QUEUE_CAPACITY);
- compactIndexFileExecutor = new ThreadPoolExecutor(
+ compactIndexFileExecutor = ThreadUtils.newThreadPoolExecutor(
1,
1,
1000 * 60,
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index fa3596d51c..1ebff6d8af 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -66,6 +66,7 @@ import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
@@ -193,7 +194,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
int threadPoolCoreSize =
Integer.parseInt(System.getProperty("rocketmq.admin.threadpool.coresize",
"20"));
- this.threadPoolExecutor = new
ThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new
LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
+ this.threadPoolExecutor = (ThreadPoolExecutor)
ThreadUtils.newThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
break;
case RUNNING: