This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9c3e7fe273 [ISSUE #9847] Reduce lock contention on the HandleData 
object to prevent threads from hanging (#9848)
9c3e7fe273 is described below

commit 9c3e7fe27386a4dbf1e921b1784a73e5b9a1158a
Author: qianye <[email protected]>
AuthorDate: Fri Nov 14 15:11:40 2025 +0800

    [ISSUE #9847] Reduce lock contention on the HandleData object to prevent 
threads from hanging (#9848)
---
 .../rocketmq/proxy/common/ReceiptHandleGroup.java  | 17 ++++++++++--
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 30 ++++++++++++++--------
 .../receipt/DefaultReceiptHandleManager.java       | 29 +++++++++++++++++----
 3 files changed, 58 insertions(+), 18 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index 15da628dc3..a35eaa5886 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -200,6 +200,14 @@ public class ReceiptHandleGroup {
         return this.receiptHandleMap.isEmpty();
     }
 
+    public long getHandleNum() {
+        long handleNum = 0L;
+        for (Map.Entry<String, Map<HandleKey, HandleData>> entry : 
receiptHandleMap.entrySet()) {
+            handleNum += entry.getValue().size();
+        }
+        return handleNum;
+    }
+
     public MessageReceiptHandle get(String msgID, String handle) {
         Map<HandleKey, HandleData> handleMap = 
this.receiptHandleMap.get(msgID);
         if (handleMap == null) {
@@ -268,13 +276,18 @@ public class ReceiptHandleGroup {
 
     public void computeIfPresent(String msgID, String handle,
         Function<MessageReceiptHandle, 
CompletableFuture<MessageReceiptHandle>> function) {
+        long timeout = 
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+        computeIfPresent(msgID, handle, function, timeout);
+    }
+
+    public void computeIfPresent(String msgID, String handle,
+        Function<MessageReceiptHandle, 
CompletableFuture<MessageReceiptHandle>> function, long lockTimeout) {
         Map<HandleKey, HandleData> handleMap = 
this.receiptHandleMap.get(msgID);
         if (handleMap == null) {
             return;
         }
-        long timeout = 
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
         handleMap.computeIfPresent(new HandleKey(handle), (handleKey, 
handleData) -> {
-            Long lockTimeMs = handleData.lock(timeout);
+            Long lockTimeMs = handleData.lock(lockTimeout);
             if (lockTimeMs == null) {
                 throw new 
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute 
failed");
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index a99b0afc35..bc1919c07a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -17,17 +17,6 @@
 
 package org.apache.rocketmq.proxy.config;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.metrics.MetricsExporterType;
-import org.apache.rocketmq.common.utils.NetworkUtil;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.ProxyMode;
-import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.time.Duration;
@@ -38,6 +27,16 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.ProxyMode;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 
 public class ProxyConfig implements ConfigFile {
     private final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -204,6 +203,7 @@ public class ProxyConfig implements ConfigFile {
     private long renewAheadTimeMillis = TimeUnit.SECONDS.toMillis(10);
     private long renewMaxTimeMillis = TimeUnit.HOURS.toMillis(3);
     private long renewSchedulePeriodMillis = TimeUnit.SECONDS.toMillis(5);
+    private int returnHandleGroupThreadPoolNums = 2;
 
     private boolean enableAclRpcHookForClusterMode = false;
 
@@ -1537,4 +1537,12 @@ public class ProxyConfig implements ConfigFile {
     public void setEnableMessageBodyEmptyCheck(boolean 
enableMessageBodyEmptyCheck) {
         this.enableMessageBodyEmptyCheck = enableMessageBodyEmptyCheck;
     }
+
+    public int getReturnHandleGroupThreadPoolNums() {
+        return returnHandleGroupThreadPoolNums;
+    }
+
+    public void setReturnHandleGroupThreadPoolNums(int 
returnHandleGroupThreadPoolNums) {
+        this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums;
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 0cb519306e..522ab2b6da 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.state.StateEventListener;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.common.utils.ExceptionUtils;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -53,7 +54,6 @@ import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
 import org.apache.rocketmq.proxy.common.RenewEvent;
 import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
-import org.apache.rocketmq.common.utils.ExceptionUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
@@ -70,6 +70,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
     protected final ScheduledExecutorService scheduledExecutorService =
         ThreadUtils.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("RenewalScheduledThread_"));
     protected final ThreadPoolExecutor renewalWorkerService;
+    protected final ThreadPoolExecutor returnHandleGroupWorkerService;
 
     public DefaultReceiptHandleManager(MetadataService metadataService, 
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
         this.metadataService = metadataService;
@@ -83,6 +84,13 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
             "RenewalWorkerThread",
             proxyConfig.getRenewThreadPoolQueueCapacity()
         );
+        this.returnHandleGroupWorkerService = 
ThreadPoolMonitor.createAndMonitor(
+            proxyConfig.getReturnHandleGroupThreadPoolNums(),
+            proxyConfig.getReturnHandleGroupThreadPoolNums() * 2,
+            1, TimeUnit.MINUTES,
+            "ReturnHandleGroupWorkerThread",
+            proxyConfig.getRenewThreadPoolQueueCapacity()
+        );
         consumerManager.appendConsumerIdsChangeListener(new 
ConsumerIdsChangeListener() {
             @Override
             public void handle(ConsumerGroupEvent event, String group, 
Object... args) {
@@ -172,7 +180,7 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
 
     protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey 
key, ReceiptHandleGroup group, String msgID, String handleStr) {
         try {
-            group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> 
startRenewMessage(context, key, messageReceiptHandle));
+            group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> 
startRenewMessage(context, key, messageReceiptHandle), 0);
         } catch (Exception e) {
             log.error("error when renew message. msgID:{}, handleStr:{}", 
msgID, handleStr, e);
         }
@@ -237,22 +245,33 @@ public class DefaultReceiptHandleManager extends 
AbstractStartAndShutdown implem
         if (key == null) {
             return;
         }
-        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
-        if (handleGroup == null) {
+        returnHandleGroupWorkerService.submit(() -> returnHandleGroup(key, 
handleGroup));
+    }
+
+    // There is no longer any waiting for lock, and only the locked handles 
will be processed immediately,
+    // while the handles that cannot be acquired will be kept waiting for the 
next scheduling.
+    private void returnHandleGroup(ReceiptHandleGroupKey key, 
ReceiptHandleGroup handleGroup) {
+        if (handleGroup == null || handleGroup.isEmpty()) {
             return;
         }
+        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
         handleGroup.scan((msgID, handle, v) -> {
             try {
                 handleGroup.computeIfPresent(msgID, handle, 
messageReceiptHandle -> {
                     CompletableFuture<AckResult> future = new 
CompletableFuture<>();
                     eventListener.fireEvent(new RenewEvent(key, 
messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(), 
RenewEvent.EventType.CLEAR_GROUP, future));
                     return CompletableFuture.completedFuture(null);
-                });
+                }, 0);
             } catch (Exception e) {
                 log.error("error when clear handle for group. key:{}", key, e);
             }
         });
+        // scheduleRenewTask will trigger cleanup again
+        if (!handleGroup.isEmpty()) {
+            log.warn("The handle cannot be completely cleared, the remaining 
quantity is {}, key:{}", handleGroup.getHandleNum(), key);
+            receiptHandleGroupMap.putIfAbsent(key, handleGroup);
+        }
     }
 
     protected void clearAllHandle() {

Reply via email to