This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 81884c8 [ROCKETMQ-312] Use independent thread pool for
QueryMessageProcessor
81884c8 is described below
commit 81884c8df3009a01c18dcfbf83a5109831878527
Author: yukon <[email protected]>
AuthorDate: Fri Dec 1 19:54:55 2017 +0800
[ROCKETMQ-312] Use independent thread pool for QueryMessageProcessor
Author: yukon <[email protected]>
Closes #192 from zhouxinyu/ROCKETMQ-312.
---
.../apache/rocketmq/broker/BrokerController.java | 29 ++++++++++++++++++----
.../broker/processor/AdminBrokerProcessor.java | 6 +++++
.../org/apache/rocketmq/common/BrokerConfig.java | 19 ++++++++++++++
3 files changed, 49 insertions(+), 5 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index cd68552..0a6f0b4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -114,6 +114,7 @@ public class BrokerController {
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
+ private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final FilterServerManager filterServerManager;
@@ -126,6 +127,7 @@ public class BrokerController {
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
+ private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
private ExecutorService consumerManageExecutor;
@@ -163,8 +165,8 @@ public class BrokerController {
this.slaveSynchronize = new SlaveSynchronize(this);
this.sendThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
-
this.pullThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+ this.queryThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
@@ -191,6 +193,10 @@ public class BrokerController {
return pullThreadPoolQueue;
}
+ public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
+ return queryThreadPoolQueue;
+ }
+
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();
@@ -237,6 +243,14 @@ public class BrokerController {
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
+ this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.brokerConfig.getQueryMessageThreadPoolNums(),
+ this.brokerConfig.getQueryMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.queryThreadPoolQueue,
+ new ThreadFactoryImpl("QueryMessageThread_"));
+
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(),
new ThreadFactoryImpl(
"AdminBrokerThread_"));
@@ -404,11 +418,11 @@ public class BrokerController {
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
- this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,
queryProcessor, this.pullMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,
queryProcessor, this.pullMessageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,
queryProcessor, this.queryMessageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,
queryProcessor, this.queryMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,
queryProcessor, this.pullMessageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,
queryProcessor, this.pullMessageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE,
queryProcessor, this.queryMessageExecutor);
+
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID,
queryProcessor, this.queryMessageExecutor);
/**
* ClientManageProcessor
@@ -494,9 +508,14 @@ public class BrokerController {
return this.headSlowTimeMills(this.pullThreadPoolQueue);
}
+ public long headSlowTimeMills4QueryThreadPoolQueue() {
+ return this.headSlowTimeMills(this.queryThreadPoolQueue);
+ }
+
public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills:
{}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills:
{}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
+ LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills:
{}", this.queryThreadPoolQueue.size(),
headSlowTimeMills4QueryThreadPoolQueue());
}
public MessageStore getMessageStore() {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index abea4ec..d69a787 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1205,11 +1205,17 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
runtimeInfo.put("pullThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
+ runtimeInfo.put("queryThreadPoolQueueSize",
String.valueOf(this.brokerController.getQueryThreadPoolQueue().size()));
+ runtimeInfo.put("queryThreadPoolQueueCapacity",
+
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
+
runtimeInfo.put("dispatchBehindBytes",
String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills",
String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills",
String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills",
String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue()));
+ runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills",
String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue()));
+
runtimeInfo.put("earliestMessageTimeStamp",
String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
runtimeInfo.put("startAcceptSendRequestTimeStamp",
String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
if (this.brokerController.getMessageStore() instanceof
DefaultMessageStore) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9a208a3..c344a7c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -59,6 +59,8 @@ public class BrokerConfig {
*/
private int sendMessageThreadPoolNums = 1; //16 +
Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 +
Runtime.getRuntime().availableProcessors() * 2;
+ private int queryMessageThreadPoolNums = 8 +
Runtime.getRuntime().availableProcessors();
+
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
@@ -73,6 +75,7 @@ public class BrokerConfig {
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
+ private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
@@ -306,6 +309,14 @@ public class BrokerConfig {
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}
+ public int getQueryMessageThreadPoolNums() {
+ return queryMessageThreadPoolNums;
+ }
+
+ public void setQueryMessageThreadPoolNums(final int
queryMessageThreadPoolNums) {
+ this.queryMessageThreadPoolNums = queryMessageThreadPoolNums;
+ }
+
public int getAdminBrokerThreadPoolNums() {
return adminBrokerThreadPoolNums;
}
@@ -394,6 +405,14 @@ public class BrokerConfig {
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
+ public int getQueryThreadPoolQueueCapacity() {
+ return queryThreadPoolQueueCapacity;
+ }
+
+ public void setQueryThreadPoolQueueCapacity(final int
queryThreadPoolQueueCapacity) {
+ this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
+ }
+
public boolean isBrokerTopicEnable() {
return brokerTopicEnable;
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].