This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6f37957e09 Add notifyLast flag for PopLongPollingService (#7835)
6f37957e09 is described below
commit 6f37957e099aed1cb3b93c541462b1089b99e4db
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Feb 20 14:41:48 2024 +0800
Add notifyLast flag for PopLongPollingService (#7835)
---
.../broker/longpolling/PopLongPollingService.java | 31 +++++++++++++++-------
.../broker/processor/NotificationProcessor.java | 2 +-
.../broker/processor/PopMessageProcessor.java | 2 +-
3 files changed, 24 insertions(+), 11 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index f1bc9adc46..a768fe4b9c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -51,14 +51,16 @@ public class PopLongPollingService extends ServiceThread {
private long lastCleanTime = 0;
private final AtomicLong totalPollingNum = new AtomicLong(0);
+ private final boolean notifyLast;
- public PopLongPollingService(BrokerController brokerController,
NettyRequestProcessor processor) {
+ public PopLongPollingService(BrokerController brokerController,
NettyRequestProcessor processor, boolean notifyLast) {
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
this.topicCidMap = new
ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentSkipListSet<PopRequest>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+ this.notifyLast = notifyLast;
}
@Override
@@ -172,17 +174,10 @@ public class PopLongPollingService extends ServiceThread {
if (remotingCommands == null || remotingCommands.isEmpty()) {
return false;
}
- PopRequest popRequest = remotingCommands.pollFirst();
- //clean inactive channel
- while (popRequest != null && !popRequest.getChannel().isActive()) {
- totalPollingNum.decrementAndGet();
- popRequest = remotingCommands.pollFirst();
- }
-
+ PopRequest popRequest = pollRemotingCommands(remotingCommands);
if (popRequest == null) {
return false;
}
- totalPollingNum.decrementAndGet();
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}",
popRequest);
}
@@ -340,4 +335,22 @@ public class PopLongPollingService extends ServiceThread {
lastCleanTime = System.currentTimeMillis();
}
+
+ private PopRequest pollRemotingCommands(ConcurrentSkipListSet<PopRequest>
remotingCommands) {
+ if (remotingCommands == null || remotingCommands.isEmpty()) {
+ return null;
+ }
+
+ PopRequest popRequest;
+ do {
+ if (notifyLast) {
+ popRequest = remotingCommands.pollLast();
+ } else {
+ popRequest = remotingCommands.pollFirst();
+ }
+ totalPollingNum.decrementAndGet();
+ } while (popRequest != null && !popRequest.getChannel().isActive());
+
+ return popRequest;
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 0deb3ee707..6447500cbe 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -50,7 +50,7 @@ public class NotificationProcessor implements
NettyRequestProcessor {
public NotificationProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
- this.popLongPollingService = new
PopLongPollingService(brokerController, this);
+ this.popLongPollingService = new
PopLongPollingService(brokerController, this, true);
}
@Override
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 59ff2e0fd5..93c04a1b8d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -110,7 +110,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
public PopMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.reviveTopic =
PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName());
- this.popLongPollingService = new
PopLongPollingService(brokerController, this);
+ this.popLongPollingService = new
PopLongPollingService(brokerController, this, false);
this.queueLockManager = new QueueLockManager();
this.popBufferMergeService = new
PopBufferMergeService(this.brokerController, this);
this.ckMessageNumber = new AtomicLong();