This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6f37957e09 Add notifyLast flag for PopLongPollingService (#7835)
6f37957e09 is described below

commit 6f37957e099aed1cb3b93c541462b1089b99e4db
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Feb 20 14:41:48 2024 +0800

    Add notifyLast flag for PopLongPollingService (#7835)
---
 .../broker/longpolling/PopLongPollingService.java  | 31 +++++++++++++++-------
 .../broker/processor/NotificationProcessor.java    |  2 +-
 .../broker/processor/PopMessageProcessor.java      |  2 +-
 3 files changed, 24 insertions(+), 11 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index f1bc9adc46..a768fe4b9c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -51,14 +51,16 @@ public class PopLongPollingService extends ServiceThread {
     private long lastCleanTime = 0;
 
     private final AtomicLong totalPollingNum = new AtomicLong(0);
+    private final boolean notifyLast;
 
-    public PopLongPollingService(BrokerController brokerController, 
NettyRequestProcessor processor) {
+    public PopLongPollingService(BrokerController brokerController, 
NettyRequestProcessor processor, boolean notifyLast) {
         this.brokerController = brokerController;
         this.processor = processor;
         // 100000 topic default,  100000 lru topic + cid + qid
         this.topicCidMap = new 
ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
         this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentSkipListSet<PopRequest>>()
             
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+        this.notifyLast = notifyLast;
     }
 
     @Override
@@ -172,17 +174,10 @@ public class PopLongPollingService extends ServiceThread {
         if (remotingCommands == null || remotingCommands.isEmpty()) {
             return false;
         }
-        PopRequest popRequest = remotingCommands.pollFirst();
-        //clean inactive channel
-        while (popRequest != null && !popRequest.getChannel().isActive()) {
-            totalPollingNum.decrementAndGet();
-            popRequest = remotingCommands.pollFirst();
-        }
-
+        PopRequest popRequest = pollRemotingCommands(remotingCommands);
         if (popRequest == null) {
             return false;
         }
-        totalPollingNum.decrementAndGet();
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", 
popRequest);
         }
@@ -340,4 +335,22 @@ public class PopLongPollingService extends ServiceThread {
 
         lastCleanTime = System.currentTimeMillis();
     }
+
+    private PopRequest pollRemotingCommands(ConcurrentSkipListSet<PopRequest> 
remotingCommands) {
+        if (remotingCommands == null || remotingCommands.isEmpty()) {
+            return null;
+        }
+
+        PopRequest popRequest;
+        do {
+            if (notifyLast) {
+                popRequest = remotingCommands.pollLast();
+            } else {
+                popRequest = remotingCommands.pollFirst();
+            }
+            totalPollingNum.decrementAndGet();
+        } while (popRequest != null && !popRequest.getChannel().isActive());
+
+        return popRequest;
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 0deb3ee707..6447500cbe 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -50,7 +50,7 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
 
     public NotificationProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
-        this.popLongPollingService = new 
PopLongPollingService(brokerController, this);
+        this.popLongPollingService = new 
PopLongPollingService(brokerController, this, true);
     }
 
     @Override
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 59ff2e0fd5..93c04a1b8d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -110,7 +110,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
     public PopMessageProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
         this.reviveTopic = 
PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName());
-        this.popLongPollingService = new 
PopLongPollingService(brokerController, this);
+        this.popLongPollingService = new 
PopLongPollingService(brokerController, this, false);
         this.queueLockManager = new QueueLockManager();
         this.popBufferMergeService = new 
PopBufferMergeService(this.brokerController, this);
         this.ckMessageNumber = new AtomicLong();

Reply via email to