This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9c3e7fe273 [ISSUE #9847] Reduce lock contention on the HandleData
object to prevent threads from hanging (#9848)
9c3e7fe273 is described below
commit 9c3e7fe27386a4dbf1e921b1784a73e5b9a1158a
Author: qianye <[email protected]>
AuthorDate: Fri Nov 14 15:11:40 2025 +0800
[ISSUE #9847] Reduce lock contention on the HandleData object to prevent
threads from hanging (#9848)
---
.../rocketmq/proxy/common/ReceiptHandleGroup.java | 17 ++++++++++--
.../apache/rocketmq/proxy/config/ProxyConfig.java | 30 ++++++++++++++--------
.../receipt/DefaultReceiptHandleManager.java | 29 +++++++++++++++++----
3 files changed, 58 insertions(+), 18 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index 15da628dc3..a35eaa5886 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -200,6 +200,14 @@ public class ReceiptHandleGroup {
return this.receiptHandleMap.isEmpty();
}
+ public long getHandleNum() {
+ long handleNum = 0L;
+ for (Map.Entry<String, Map<HandleKey, HandleData>> entry :
receiptHandleMap.entrySet()) {
+ handleNum += entry.getValue().size();
+ }
+ return handleNum;
+ }
+
public MessageReceiptHandle get(String msgID, String handle) {
Map<HandleKey, HandleData> handleMap =
this.receiptHandleMap.get(msgID);
if (handleMap == null) {
@@ -268,13 +276,18 @@ public class ReceiptHandleGroup {
public void computeIfPresent(String msgID, String handle,
Function<MessageReceiptHandle,
CompletableFuture<MessageReceiptHandle>> function) {
+ long timeout =
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+ computeIfPresent(msgID, handle, function, timeout);
+ }
+
+ public void computeIfPresent(String msgID, String handle,
+ Function<MessageReceiptHandle,
CompletableFuture<MessageReceiptHandle>> function, long lockTimeout) {
Map<HandleKey, HandleData> handleMap =
this.receiptHandleMap.get(msgID);
if (handleMap == null) {
return;
}
- long timeout =
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
handleMap.computeIfPresent(new HandleKey(handle), (handleKey,
handleData) -> {
- Long lockTimeMs = handleData.lock(timeout);
+ Long lockTimeMs = handleData.lock(lockTimeout);
if (lockTimeMs == null) {
throw new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute
failed");
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index a99b0afc35..bc1919c07a 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -17,17 +17,6 @@
package org.apache.rocketmq.proxy.config;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.metrics.MetricsExporterType;
-import org.apache.rocketmq.common.utils.NetworkUtil;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.ProxyMode;
-import org.apache.rocketmq.proxy.common.ProxyException;
-import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
@@ -38,6 +27,16 @@ import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.ProxyMode;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
public class ProxyConfig implements ConfigFile {
private final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -204,6 +203,7 @@ public class ProxyConfig implements ConfigFile {
private long renewAheadTimeMillis = TimeUnit.SECONDS.toMillis(10);
private long renewMaxTimeMillis = TimeUnit.HOURS.toMillis(3);
private long renewSchedulePeriodMillis = TimeUnit.SECONDS.toMillis(5);
+ private int returnHandleGroupThreadPoolNums = 2;
private boolean enableAclRpcHookForClusterMode = false;
@@ -1537,4 +1537,12 @@ public class ProxyConfig implements ConfigFile {
public void setEnableMessageBodyEmptyCheck(boolean
enableMessageBodyEmptyCheck) {
this.enableMessageBodyEmptyCheck = enableMessageBodyEmptyCheck;
}
+
+ public int getReturnHandleGroupThreadPoolNums() {
+ return returnHandleGroupThreadPoolNums;
+ }
+
+ public void setReturnHandleGroupThreadPoolNums(int
returnHandleGroupThreadPoolNums) {
+ this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums;
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 0cb519306e..522ab2b6da 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.state.StateEventListener;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.common.utils.ExceptionUtils;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -53,7 +54,6 @@ import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
-import org.apache.rocketmq.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
@@ -70,6 +70,7 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
protected final ScheduledExecutorService scheduledExecutorService =
ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("RenewalScheduledThread_"));
protected final ThreadPoolExecutor renewalWorkerService;
+ protected final ThreadPoolExecutor returnHandleGroupWorkerService;
public DefaultReceiptHandleManager(MetadataService metadataService,
ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
this.metadataService = metadataService;
@@ -83,6 +84,13 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
"RenewalWorkerThread",
proxyConfig.getRenewThreadPoolQueueCapacity()
);
+ this.returnHandleGroupWorkerService =
ThreadPoolMonitor.createAndMonitor(
+ proxyConfig.getReturnHandleGroupThreadPoolNums(),
+ proxyConfig.getReturnHandleGroupThreadPoolNums() * 2,
+ 1, TimeUnit.MINUTES,
+ "ReturnHandleGroupWorkerThread",
+ proxyConfig.getRenewThreadPoolQueueCapacity()
+ );
consumerManager.appendConsumerIdsChangeListener(new
ConsumerIdsChangeListener() {
@Override
public void handle(ConsumerGroupEvent event, String group,
Object... args) {
@@ -172,7 +180,7 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey
key, ReceiptHandleGroup group, String msgID, String handleStr) {
try {
- group.computeIfPresent(msgID, handleStr, messageReceiptHandle ->
startRenewMessage(context, key, messageReceiptHandle));
+ group.computeIfPresent(msgID, handleStr, messageReceiptHandle ->
startRenewMessage(context, key, messageReceiptHandle), 0);
} catch (Exception e) {
log.error("error when renew message. msgID:{}, handleStr:{}",
msgID, handleStr, e);
}
@@ -237,22 +245,33 @@ public class DefaultReceiptHandleManager extends
AbstractStartAndShutdown implem
if (key == null) {
return;
}
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
ReceiptHandleGroup handleGroup = receiptHandleGroupMap.remove(key);
- if (handleGroup == null) {
+ returnHandleGroupWorkerService.submit(() -> returnHandleGroup(key,
handleGroup));
+ }
+
+ // There is no longer any waiting for lock, and only the locked handles
will be processed immediately,
+ // while the handles that cannot be acquired will be kept waiting for the
next scheduling.
+ private void returnHandleGroup(ReceiptHandleGroupKey key,
ReceiptHandleGroup handleGroup) {
+ if (handleGroup == null || handleGroup.isEmpty()) {
return;
}
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
handleGroup.scan((msgID, handle, v) -> {
try {
handleGroup.computeIfPresent(msgID, handle,
messageReceiptHandle -> {
CompletableFuture<AckResult> future = new
CompletableFuture<>();
eventListener.fireEvent(new RenewEvent(key,
messageReceiptHandle, proxyConfig.getInvisibleTimeMillisWhenClear(),
RenewEvent.EventType.CLEAR_GROUP, future));
return CompletableFuture.completedFuture(null);
- });
+ }, 0);
} catch (Exception e) {
log.error("error when clear handle for group. key:{}", key, e);
}
});
+ // scheduleRenewTask will trigger cleanup again
+ if (!handleGroup.isEmpty()) {
+ log.warn("The handle cannot be completely cleared, the remaining
quantity is {}, key:{}", handleGroup.getHandleNum(), key);
+ receiptHandleGroupMap.putIfAbsent(key, handleGroup);
+ }
}
protected void clearAllHandle() {