This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new efa8e45 [ISSUE #396]Use separated thread pool and add monitor tools
for transactional message (#397)
efa8e45 is described below
commit efa8e457e84a7406923594b9516545661b56765e
Author: duheng <[email protected]>
AuthorDate: Fri Aug 17 11:20:05 2018 +0800
[ISSUE #396]Use separated thread pool and add monitor tools for
transactional message (#397)
* Use separate threadpool and add monitor tools for transaction
* Modify log level
---
.../apache/rocketmq/broker/BrokerController.java | 34 ++++++++++++++++++---
.../rocketmq/broker/latency/BrokerFastFailure.java | 3 ++
.../broker/processor/AdminBrokerProcessor.java | 4 +++
.../rocketmq/broker/util/ServiceProvider.java | 32 ++++++++++----------
.../org/apache/rocketmq/common/BrokerConfig.java | 35 ++++++++++++++++++++--
5 files changed, 87 insertions(+), 21 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index f45674d..a206922 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
public class BrokerController {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger LOG_PROTECTION =
InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -131,6 +130,7 @@ public class BrokerController {
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
+ private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new
ArrayList<SendMessageHook>();
@@ -146,6 +146,7 @@ public class BrokerController {
private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
+ private ExecutorService endTransactionExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
@@ -189,6 +190,7 @@ public class BrokerController {
this.clientManagerThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
+ this.endTransactionThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new
BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(),
this.getNettyServerConfig().getListenPort()));
@@ -289,8 +291,15 @@ public class BrokerController {
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
- new ThreadFactoryImpl("HeartbeatThread_",true));
+ new ThreadFactoryImpl("HeartbeatThread_", true));
+ this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
+ this.brokerConfig.getEndTransactionThreadPoolNums(),
+ this.brokerConfig.getEndTransactionThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.endTransactionThreadPoolQueue,
+ new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(),
new ThreadFactoryImpl(
@@ -536,8 +545,8 @@ public class BrokerController {
/**
* EndTransactionProcessor
*/
- this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new
EndTransactionProcessor(this), this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION,
new EndTransactionProcessor(this), this.sendMessageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new
EndTransactionProcessor(this), this.endTransactionExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION,
new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* Default
@@ -598,10 +607,15 @@ public class BrokerController {
return this.headSlowTimeMills(this.queryThreadPoolQueue);
}
+ public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
+ return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
+ }
+
public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills:
{}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills:
{}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills:
{}", this.queryThreadPoolQueue.size(),
headSlowTimeMills4QueryThreadPoolQueue());
+ LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {}
SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(),
headSlowTimeMills4EndTransactionThreadPoolQueue());
}
public MessageStore getMessageStore() {
@@ -741,6 +755,14 @@ public class BrokerController {
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
+
+ if (this.transactionalMessageCheckService != null) {
+ this.transactionalMessageCheckService.shutdown();
+ }
+
+ if (this.endTransactionExecutor != null) {
+ this.endTransactionExecutor.shutdown();
+ }
}
private void unregisterBrokerAll() {
@@ -1027,4 +1049,8 @@ public class BrokerController {
AbstractTransactionalMessageCheckListener
transactionalMessageCheckListener) {
this.transactionalMessageCheckListener =
transactionalMessageCheckListener;
}
+
+ public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
+ return endTransactionThreadPoolQueue;
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0a8beca..a018f68 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -92,6 +92,9 @@ public class BrokerFastFailure {
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
+
+
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(),
this
+
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable>
blockingQueue, final long maxWaitTimeMillsInQueue) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 1a704a8..356aafc 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
runtimeInfo.put("queryThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
+ runtimeInfo.put("EndTransactionQueueSize",
String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
+ runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",
+
String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity()));
+
runtimeInfo.put("dispatchBehindBytes",
String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills",
String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
index 59be7a7..8b9b63e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
@@ -125,23 +125,25 @@ public class ServiceProvider {
public static <T> T loadClass(String name, Class<?> clazz) {
final InputStream is = getResourceAsStream(getContextClassLoader(),
name);
- BufferedReader reader;
- try {
+ if (is != null) {
+ BufferedReader reader;
try {
- reader = new BufferedReader(new InputStreamReader(is,
"UTF-8"));
- } catch (java.io.UnsupportedEncodingException e) {
- reader = new BufferedReader(new InputStreamReader(is));
- }
- String serviceName = reader.readLine();
- reader.close();
- if (serviceName != null && !"".equals(serviceName)) {
- return initService(getContextClassLoader(), serviceName,
clazz);
- } else {
- LOG.warn("ServiceName is empty!");
- return null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(is,
"UTF-8"));
+ } catch (java.io.UnsupportedEncodingException e) {
+ reader = new BufferedReader(new InputStreamReader(is));
+ }
+ String serviceName = reader.readLine();
+ reader.close();
+ if (serviceName != null && !"".equals(serviceName)) {
+ return initService(getContextClassLoader(), serviceName,
clazz);
+ } else {
+ LOG.warn("ServiceName is empty!");
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error occurred when looking for resource file " +
name, e);
}
- } catch (Exception e) {
- LOG.error("Error occured when looking for resource file " + name,
e);
}
return null;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 442f456..963c88a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -63,7 +63,12 @@ public class BrokerConfig {
private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
- private int heartbeatThreadPoolNums =
Math.min(32,Runtime.getRuntime().availableProcessors());
+ private int heartbeatThreadPoolNums = Math.min(32,
Runtime.getRuntime().availableProcessors());
+
+ /**
+ * Thread numbers for EndTransactionProcessor
+ */
+ private int endTransactionThreadPoolNums = 8 +
Runtime.getRuntime().availableProcessors() * 2;
private int flushConsumerOffsetInterval = 1000 * 5;
@@ -79,6 +84,7 @@ public class BrokerConfig {
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
+ private int endTransactionPoolQueueCapacity = 100000;
private int filterServerNums = 0;
@@ -111,6 +117,7 @@ public class BrokerConfig {
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
+ private long waitTimeMillsInTransactionQueue = 3 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
@@ -156,7 +163,7 @@ public class BrokerConfig {
* The maximum number of times the message was checked, if exceed this
value, this message will be discarded.
*/
@ImportantField
- private int transactionCheckMax = 5;
+ private int transactionCheckMax = 15;
/**
* Transaction message check interval.
@@ -701,4 +708,28 @@ public class BrokerConfig {
public void setTransactionCheckInterval(long transactionCheckInterval) {
this.transactionCheckInterval = transactionCheckInterval;
}
+
+ public int getEndTransactionThreadPoolNums() {
+ return endTransactionThreadPoolNums;
+ }
+
+ public void setEndTransactionThreadPoolNums(int
endTransactionThreadPoolNums) {
+ this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
+ }
+
+ public int getEndTransactionPoolQueueCapacity() {
+ return endTransactionPoolQueueCapacity;
+ }
+
+ public void setEndTransactionPoolQueueCapacity(int
endTransactionPoolQueueCapacity) {
+ this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
+ }
+
+ public long getWaitTimeMillsInTransactionQueue() {
+ return waitTimeMillsInTransactionQueue;
+ }
+
+ public void setWaitTimeMillsInTransactionQueue(long
waitTimeMillsInTransactionQueue) {
+ this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
+ }
}