This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 81884c8  [ROCKETMQ-312] Use independent thread pool for 
QueryMessageProcessor
81884c8 is described below

commit 81884c8df3009a01c18dcfbf83a5109831878527
Author: yukon <[email protected]>
AuthorDate: Fri Dec 1 19:54:55 2017 +0800

    [ROCKETMQ-312] Use independent thread pool for QueryMessageProcessor
    
    Author: yukon <[email protected]>
    
    Closes #192 from zhouxinyu/ROCKETMQ-312.
---
 .../apache/rocketmq/broker/BrokerController.java   | 29 ++++++++++++++++++----
 .../broker/processor/AdminBrokerProcessor.java     |  6 +++++
 .../org/apache/rocketmq/common/BrokerConfig.java   | 19 ++++++++++++++
 3 files changed, 49 insertions(+), 5 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index cd68552..0a6f0b4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -114,6 +114,7 @@ public class BrokerController {
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> pullThreadPoolQueue;
+    private final BlockingQueue<Runnable> queryThreadPoolQueue;
     private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
     private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
     private final FilterServerManager filterServerManager;
@@ -126,6 +127,7 @@ public class BrokerController {
     private TopicConfigManager topicConfigManager;
     private ExecutorService sendMessageExecutor;
     private ExecutorService pullMessageExecutor;
+    private ExecutorService queryMessageExecutor;
     private ExecutorService adminBrokerExecutor;
     private ExecutorService clientManageExecutor;
     private ExecutorService consumerManageExecutor;
@@ -163,8 +165,8 @@ public class BrokerController {
         this.slaveSynchronize = new SlaveSynchronize(this);
 
         this.sendThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
-
         this.pullThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+        this.queryThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
         this.clientManagerThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
         this.consumerManagerThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
 
@@ -191,6 +193,10 @@ public class BrokerController {
         return pullThreadPoolQueue;
     }
 
+    public BlockingQueue<Runnable> getQueryThreadPoolQueue() {
+        return queryThreadPoolQueue;
+    }
+
     public boolean initialize() throws CloneNotSupportedException {
         boolean result = this.topicConfigManager.load();
 
@@ -237,6 +243,14 @@ public class BrokerController {
                 this.pullThreadPoolQueue,
                 new ThreadFactoryImpl("PullMessageThread_"));
 
+            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
+                this.brokerConfig.getQueryMessageThreadPoolNums(),
+                this.brokerConfig.getQueryMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.queryThreadPoolQueue,
+                new ThreadFactoryImpl("QueryMessageThread_"));
+
             this.adminBrokerExecutor =
                 
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), 
new ThreadFactoryImpl(
                     "AdminBrokerThread_"));
@@ -404,11 +418,11 @@ public class BrokerController {
          * QueryMessageProcessor
          */
         NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
-        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, 
queryProcessor, this.pullMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, 
queryProcessor, this.pullMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, 
queryProcessor, this.queryMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, 
queryProcessor, this.queryMessageExecutor);
 
-        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, 
queryProcessor, this.pullMessageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, 
queryProcessor, this.pullMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, 
queryProcessor, this.queryMessageExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, 
queryProcessor, this.queryMessageExecutor);
 
         /**
          * ClientManageProcessor
@@ -494,9 +508,14 @@ public class BrokerController {
         return this.headSlowTimeMills(this.pullThreadPoolQueue);
     }
 
+    public long headSlowTimeMills4QueryThreadPoolQueue() {
+        return this.headSlowTimeMills(this.queryThreadPoolQueue);
+    }
+
     public void printWaterMark() {
         LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: 
{}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
         LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: 
{}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: 
{}", this.queryThreadPoolQueue.size(), 
headSlowTimeMills4QueryThreadPoolQueue());
     }
 
     public MessageStore getMessageStore() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index abea4ec..d69a787 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1205,11 +1205,17 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         runtimeInfo.put("pullThreadPoolQueueCapacity",
             
String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
 
+        runtimeInfo.put("queryThreadPoolQueueSize", 
String.valueOf(this.brokerController.getQueryThreadPoolQueue().size()));
+        runtimeInfo.put("queryThreadPoolQueueCapacity",
+            
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
+
         runtimeInfo.put("dispatchBehindBytes", 
String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
         runtimeInfo.put("pageCacheLockTimeMills", 
String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
 
         runtimeInfo.put("sendThreadPoolQueueHeadWaitTimeMills", 
String.valueOf(this.brokerController.headSlowTimeMills4SendThreadPoolQueue()));
         runtimeInfo.put("pullThreadPoolQueueHeadWaitTimeMills", 
String.valueOf(this.brokerController.headSlowTimeMills4PullThreadPoolQueue()));
+        runtimeInfo.put("queryThreadPoolQueueHeadWaitTimeMills", 
String.valueOf(this.brokerController.headSlowTimeMills4QueryThreadPoolQueue()));
+
         runtimeInfo.put("earliestMessageTimeStamp", 
String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
         runtimeInfo.put("startAcceptSendRequestTimeStamp", 
String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
         if (this.brokerController.getMessageStore() instanceof 
DefaultMessageStore) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9a208a3..c344a7c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -59,6 +59,8 @@ public class BrokerConfig {
      */
     private int sendMessageThreadPoolNums = 1; //16 + 
Runtime.getRuntime().availableProcessors() * 4;
     private int pullMessageThreadPoolNums = 16 + 
Runtime.getRuntime().availableProcessors() * 2;
+    private int queryMessageThreadPoolNums = 8 + 
Runtime.getRuntime().availableProcessors();
+
     private int adminBrokerThreadPoolNums = 16;
     private int clientManageThreadPoolNums = 32;
     private int consumerManageThreadPoolNums = 32;
@@ -73,6 +75,7 @@ public class BrokerConfig {
     private boolean fetchNamesrvAddrByAddressServer = false;
     private int sendThreadPoolQueueCapacity = 10000;
     private int pullThreadPoolQueueCapacity = 100000;
+    private int queryThreadPoolQueueCapacity = 20000;
     private int clientManagerThreadPoolQueueCapacity = 1000000;
     private int consumerManagerThreadPoolQueueCapacity = 1000000;
 
@@ -306,6 +309,14 @@ public class BrokerConfig {
         this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
     }
 
+    public int getQueryMessageThreadPoolNums() {
+        return queryMessageThreadPoolNums;
+    }
+
+    public void setQueryMessageThreadPoolNums(final int 
queryMessageThreadPoolNums) {
+        this.queryMessageThreadPoolNums = queryMessageThreadPoolNums;
+    }
+
     public int getAdminBrokerThreadPoolNums() {
         return adminBrokerThreadPoolNums;
     }
@@ -394,6 +405,14 @@ public class BrokerConfig {
         this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
     }
 
+    public int getQueryThreadPoolQueueCapacity() {
+        return queryThreadPoolQueueCapacity;
+    }
+
+    public void setQueryThreadPoolQueueCapacity(final int 
queryThreadPoolQueueCapacity) {
+        this.queryThreadPoolQueueCapacity = queryThreadPoolQueueCapacity;
+    }
+
     public boolean isBrokerTopicEnable() {
         return brokerTopicEnable;
     }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to