This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bf343a3 [ISSUE #3225]change Random to ThreadLocalRandom in broker
bf343a3 is described below
commit bf343a31a94be1d2eb420a22c647d947f26c8e5d
Author: racoon <[email protected]>
AuthorDate: Mon Sep 13 13:59:37 2021 +0800
[ISSUE #3225]change Random to ThreadLocalRandom in broker
---
.../broker/processor/AbstractSendMessageProcessor.java | 6 +++---
.../rocketmq/broker/processor/ReplyMessageProcessor.java | 4 +++-
.../rocketmq/broker/processor/SendMessageProcessor.java | 11 ++++++-----
.../AbstractTransactionalMessageCheckListener.java | 2 --
.../queue/DefaultTransactionalMessageCheckListener.java | 4 +++-
5 files changed, 15 insertions(+), 12 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 35f8660..9d26e99 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -22,7 +22,8 @@ import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -60,7 +61,6 @@ public abstract class AbstractSendMessageProcessor extends
AsyncNettyRequestProc
protected final static int DLQ_NUMS_PER_GROUP = 1;
protected final BrokerController brokerController;
- protected final Random random = new Random(System.currentTimeMillis());
protected final SocketAddress storeHost;
private List<SendMessageHook> sendMessageHookList;
@@ -109,7 +109,7 @@ public abstract class AbstractSendMessageProcessor extends
AsyncNettyRequestProc
final SendMessageRequestHeader requestHeader, final byte[] body,
TopicConfig topicConfig) {
int queueIdInt = requestHeader.getQueueId();
if (queueIdInt < 0) {
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) %
topicConfig.getWriteQueueNums();
+ queueIdInt = ThreadLocalRandom.current().nextInt(99999999) %
topicConfig.getWriteQueueNums();
}
int sysFlag = requestHeader.getSysFlag();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index 2890fc4..f31576f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -45,6 +45,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import java.util.concurrent.ThreadLocalRandom;
+
public class ReplyMessageProcessor extends AbstractSendMessageProcessor
implements NettyRequestProcessor {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -125,7 +127,7 @@ public class ReplyMessageProcessor extends
AbstractSendMessageProcessor implemen
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) %
topicConfig.getWriteQueueNums();
+ queueIdInt = ThreadLocalRandom.current().nextInt(99999999) %
topicConfig.getWriteQueueNums();
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 2cd142f..6942c88 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
@@ -141,7 +142,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
- int queueIdInt = Math.abs(this.random.nextInt() % 99999999) %
subscriptionGroupConfig.getRetryQueueNums();
+ int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) %
subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
@@ -188,7 +189,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) %
DLQ_NUMS_PER_GROUP;
+ queueIdInt = ThreadLocalRandom.current().nextInt(99999999) %
DLQ_NUMS_PER_GROUP;
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
@@ -353,7 +354,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0
: requestHeader.getReconsumeTimes();
if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName);
- int queueIdInt = Math.abs(this.random.nextInt() % 99999999) %
DLQ_NUMS_PER_GROUP;
+ int queueIdInt = ThreadLocalRandom.current().nextInt(99999999)
% DLQ_NUMS_PER_GROUP;
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
@@ -410,7 +411,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) %
topicConfig.getWriteQueueNums();
+ queueIdInt = ThreadLocalRandom.current().nextInt(99999999) %
topicConfig.getWriteQueueNums();
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
@@ -666,7 +667,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
private int randomQueueId(int writeQueueNums) {
- return (this.random.nextInt() % 99999999) % writeQueueNums;
+ return ThreadLocalRandom.current().nextInt(99999999) % writeQueueNums;
}
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand
request,
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 4cf5647..0079fb5 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.transaction;
import io.netty.channel.Channel;
-import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
@@ -40,7 +39,6 @@ public abstract class
AbstractTransactionalMessageCheckListener {
//queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC
protected final static int TCMT_QUEUE_NUMS = 1;
- protected final Random random = new Random(System.currentTimeMillis());
private static ExecutorService executorService = new ThreadPoolExecutor(2,
5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new
ThreadFactory() {
@Override
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
index ee87bd3..a28e332 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
@@ -30,6 +30,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
+import java.util.concurrent.ThreadLocalRandom;
+
public class DefaultTransactionalMessageCheckListener extends
AbstractTransactionalMessageCheckListener {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
@@ -58,7 +60,7 @@ public class DefaultTransactionalMessageCheckListener extends
AbstractTransactio
private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
TopicConfig topicConfig =
this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS,
PermName.PERM_READ | PermName.PERM_WRITE);
- int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
+ int queueId = ThreadLocalRandom.current().nextInt(99999999) %
TCMT_QUEUE_NUMS;
MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTopic(topicConfig.getTopicName());
inner.setBody(msgExt.getBody());