This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6fa05ff6f9 [ISSUE #9921] Limit the concurrency of Pop revive (#9922)
6fa05ff6f9 is described below
commit 6fa05ff6f99e7959e0bc1ef50755c7e199e0ccbf
Author: lizhimins <[email protected]>
AuthorDate: Thu Dec 18 20:42:52 2025 +0800
[ISSUE #9921] Limit the concurrency of Pop revive (#9922)
---
.../rocketmq/broker/pop/PopConsumerService.java | 20 ++++++++++++++++++--
.../org/apache/rocketmq/common/BrokerConfig.java | 9 +++++++++
2 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 7476a6c206..03fdef19a2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -65,6 +65,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -533,6 +534,7 @@ public class PopConsumerService extends ServiceThread {
});
}
+ @SuppressWarnings("StatementWithEmptyBody")
public void clearCache(String groupId, String topicId, int queueId) {
while (consumerLockService.tryLock(groupId, topicId)) {
}
@@ -551,12 +553,26 @@ public class PopConsumerService extends ServiceThread {
List<PopConsumerRecord> consumerRecords =
this.popConsumerStore.scanExpiredRecords(
currentTime.get() - TimeUnit.SECONDS.toMillis(3), upperTime,
maxCount);
long scanCostTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ // When reading messages from local storage, the current thread is used
+ // directly for data retrieval. When reading original messages from
remote
+ // storage (such as distributed file systems), so concurrency needs to
be
+ // controlled via semaphore.
+ Semaphore semaphore = new
Semaphore(brokerConfig.getPopReviveConcurrency());
Queue<PopConsumerRecord> failureList = new LinkedBlockingQueue<>();
List<CompletableFuture<?>> futureList = new
ArrayList<>(consumerRecords.size());
// could merge read operation here
for (PopConsumerRecord record : consumerRecords) {
- futureList.add(this.revive(record).thenAccept(result -> {
+ CompletableFuture<Boolean> future;
+ try {
+ semaphore.acquire();
+ future = this.revive(record);
+ } catch (Exception e) {
+ semaphore.release();
+ throw new RuntimeException(e);
+ }
+ futureList.add(future.thenAccept(result -> {
if (!result) {
if (record.getAttemptTimes() <
brokerConfig.getPopReviveMaxAttemptTimes()) {
long backoffInterval = 1000L *
REWRITE_INTERVALS_IN_SECONDS[
@@ -572,7 +588,7 @@ public class PopConsumerService extends ServiceThread {
log.error("PopConsumerService drop record, message may
be lost, record={}", record);
}
}
- }));
+ }).whenComplete((result, ex) -> semaphore.release()));
}
CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).join();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 5142ed12be..8b5a43fd35 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -249,6 +249,7 @@ public class BrokerConfig extends BrokerIdentity {
private boolean popConsumerKVServiceInit = false;
private boolean popConsumerKVServiceEnable = false;
private int popReviveMaxReturnSizePerRead = 16 * 1024;
+ private int popReviveConcurrency = 32;
private int popReviveMaxAttemptTimes = 16;
// each message queue will have a corresponding retry queue
private boolean useSeparateRetryQueue = false;
@@ -674,6 +675,14 @@ public class BrokerConfig extends BrokerIdentity {
this.popConsumerKVServiceEnable = popConsumerKVServiceEnable;
}
+ public int getPopReviveConcurrency() {
+ return popReviveConcurrency;
+ }
+
+ public void setPopReviveConcurrency(int popReviveConcurrency) {
+ this.popReviveConcurrency = popReviveConcurrency;
+ }
+
public int getPopReviveMaxReturnSizePerRead() {
return popReviveMaxReturnSizePerRead;
}