This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 5.0.0-alpha in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit fe88fb36a3e1790cdb33ca670306c2adea280ba3 Author: cserwen <[email protected]> AuthorDate: Wed Jan 12 21:00:56 2022 +0800 [ISSUE #3498] Make messages in reviveTopic more evenly written to different queues #3499 --- .../org/apache/rocketmq/broker/processor/PopMessageProcessor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index aa97fc8..fcc972d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -92,6 +92,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { private PopLongPollingService popLongPollingService; private PopBufferMergeService popBufferMergeService; private QueueLockManager queueLockManager; + private AtomicLong ckMessageNumber; public PopMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; @@ -104,6 +105,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { this.popLongPollingService = new PopLongPollingService(); this.queueLockManager = new QueueLockManager(); this.popBufferMergeService = new PopBufferMergeService(this.brokerController, this); + this.ckMessageNumber = new AtomicLong(); } public PopLongPollingService getPopLongPollingService() { @@ -350,7 +352,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { if (requestHeader.isOrder()) { reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE; } else { - reviveQid = randomQ % this.brokerController.getBrokerConfig().getReviveQueueNum(); + reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum()); } GetMessageResult getMessageResult = new GetMessageResult();
