This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6fa05ff6f9 [ISSUE #9921] Limit the concurrency of Pop revive (#9922)
6fa05ff6f9 is described below

commit 6fa05ff6f99e7959e0bc1ef50755c7e199e0ccbf
Author: lizhimins <[email protected]>
AuthorDate: Thu Dec 18 20:42:52 2025 +0800

    [ISSUE #9921] Limit the concurrency of Pop revive (#9922)
---
 .../rocketmq/broker/pop/PopConsumerService.java      | 20 ++++++++++++++++++--
 .../org/apache/rocketmq/common/BrokerConfig.java     |  9 +++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 7476a6c206..03fdef19a2 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -65,6 +65,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -533,6 +534,7 @@ public class PopConsumerService extends ServiceThread {
             });
     }
 
+    @SuppressWarnings("StatementWithEmptyBody")
     public void clearCache(String groupId, String topicId, int queueId) {
         while (consumerLockService.tryLock(groupId, topicId)) {
         }
@@ -551,12 +553,26 @@ public class PopConsumerService extends ServiceThread {
         List<PopConsumerRecord> consumerRecords = 
this.popConsumerStore.scanExpiredRecords(
                 currentTime.get() - TimeUnit.SECONDS.toMillis(3), upperTime, 
maxCount);
         long scanCostTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+        // When reading messages from local storage, the current thread is used
+        // directly for data retrieval. When reading original messages from 
remote
+        // storage (such as distributed file systems), so concurrency needs to 
be
+        // controlled via semaphore.
+        Semaphore semaphore = new 
Semaphore(brokerConfig.getPopReviveConcurrency());
         Queue<PopConsumerRecord> failureList = new LinkedBlockingQueue<>();
         List<CompletableFuture<?>> futureList = new 
ArrayList<>(consumerRecords.size());
 
         // could merge read operation here
         for (PopConsumerRecord record : consumerRecords) {
-            futureList.add(this.revive(record).thenAccept(result -> {
+            CompletableFuture<Boolean> future;
+            try {
+                semaphore.acquire();
+                future = this.revive(record);
+            } catch (Exception e) {
+                semaphore.release();
+                throw new RuntimeException(e);
+            }
+            futureList.add(future.thenAccept(result -> {
                 if (!result) {
                     if (record.getAttemptTimes() < 
brokerConfig.getPopReviveMaxAttemptTimes()) {
                         long backoffInterval = 1000L * 
REWRITE_INTERVALS_IN_SECONDS[
@@ -572,7 +588,7 @@ public class PopConsumerService extends ServiceThread {
                         log.error("PopConsumerService drop record, message may 
be lost, record={}", record);
                     }
                 }
-            }));
+            }).whenComplete((result, ex) -> semaphore.release()));
         }
 
         CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).join();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 5142ed12be..8b5a43fd35 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -249,6 +249,7 @@ public class BrokerConfig extends BrokerIdentity {
     private boolean popConsumerKVServiceInit = false;
     private boolean popConsumerKVServiceEnable = false;
     private int popReviveMaxReturnSizePerRead = 16 * 1024;
+    private int popReviveConcurrency = 32;
     private int popReviveMaxAttemptTimes = 16;
     // each message queue will have a corresponding retry queue
     private boolean useSeparateRetryQueue = false;
@@ -674,6 +675,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.popConsumerKVServiceEnable = popConsumerKVServiceEnable;
     }
 
+    public int getPopReviveConcurrency() {
+        return popReviveConcurrency;
+    }
+
+    public void setPopReviveConcurrency(int popReviveConcurrency) {
+        this.popReviveConcurrency = popReviveConcurrency;
+    }
+
     public int getPopReviveMaxReturnSizePerRead() {
         return popReviveMaxReturnSizePerRead;
     }

Reply via email to