This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5f2642391d [ISSUE #8877] Refactor lock in ReceiptHandleGroup to make
the lock can be properly released when future can not be completed (#8916)
5f2642391d is described below
commit 5f2642391dad0ec4043c6984a9b8b038f10f89b9
Author: qianye <[email protected]>
AuthorDate: Thu Nov 21 17:53:46 2024 +0800
[ISSUE #8877] Refactor lock in ReceiptHandleGroup to make the lock can be
properly released when future can not be completed (#8916)
---
.../rocketmq/proxy/common/ReceiptHandleGroup.java | 59 +++++++++++++++++-----
1 file changed, 47 insertions(+), 12 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index 6fee38d117..15da628dc3 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -25,14 +25,19 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
public class ReceiptHandleGroup {
+ protected final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
// The messages having the same messageId will be deduplicated based on
the parameters of broker, queueId, and offset
protected final Map<String /* msgID */, Map<HandleKey, HandleData>>
receiptHandleMap = new ConcurrentHashMap<>();
@@ -98,6 +103,7 @@ public class ReceiptHandleGroup {
public static class HandleData {
private final Semaphore semaphore = new Semaphore(1);
+ private final AtomicLong lastLockTimeMs = new AtomicLong(-1L);
private volatile boolean needRemove = false;
private volatile MessageReceiptHandle messageReceiptHandle;
@@ -105,15 +111,40 @@ public class ReceiptHandleGroup {
this.messageReceiptHandle = messageReceiptHandle;
}
- public boolean lock(long timeoutMs) {
+ public Long lock(long timeoutMs) {
try {
- return this.semaphore.tryAcquire(timeoutMs,
TimeUnit.MILLISECONDS);
+ boolean result = this.semaphore.tryAcquire(timeoutMs,
TimeUnit.MILLISECONDS);
+ long currentTimeMs = System.currentTimeMillis();
+ if (result) {
+ this.lastLockTimeMs.set(currentTimeMs);
+ return currentTimeMs;
+ } else {
+ // if the lock is expired, can be acquired again
+ long expiredTimeMs =
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 3;
+ if (currentTimeMs - this.lastLockTimeMs.get() >
expiredTimeMs) {
+ synchronized (this) {
+ if (currentTimeMs - this.lastLockTimeMs.get() >
expiredTimeMs) {
+ log.warn("HandleData lock expired, acquire
lock success and reset lock time. " +
+ "MessageReceiptHandle={}, lockTime={}",
messageReceiptHandle, currentTimeMs);
+ this.lastLockTimeMs.set(currentTimeMs);
+ return currentTimeMs;
+ }
+ }
+ }
+ }
+ return null;
} catch (InterruptedException e) {
- return false;
+ return null;
}
}
- public void unlock() {
+ public void unlock(long lockTimeMs) {
+ // if the lock is expired, we don't need to unlock it
+ if (System.currentTimeMillis() - lockTimeMs >
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup() * 2) {
+ log.warn("HandleData lock expired, unlock fail.
MessageReceiptHandle={}, lockTime={}, now={}",
+ messageReceiptHandle, lockTimeMs,
System.currentTimeMillis());
+ return;
+ }
this.semaphore.release();
}
@@ -149,7 +180,8 @@ public class ReceiptHandleGroup {
if (handleData == null || handleData.needRemove) {
return new HandleData(value);
}
- if (!handleData.lock(timeout)) {
+ Long lockTimeMs = handleData.lock(timeout);
+ if (lockTimeMs == null) {
throw new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to put handle
failed");
}
try {
@@ -158,7 +190,7 @@ public class ReceiptHandleGroup {
}
handleData.messageReceiptHandle = value;
} finally {
- handleData.unlock();
+ handleData.unlock(lockTimeMs);
}
return handleData;
});
@@ -176,7 +208,8 @@ public class ReceiptHandleGroup {
long timeout =
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
handleMap.computeIfPresent(new HandleKey(handle), (handleKey,
handleData) -> {
- if (!handleData.lock(timeout)) {
+ Long lockTimeMs = handleData.lock(timeout);
+ if (lockTimeMs == null) {
throw new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to get handle
failed");
}
try {
@@ -185,7 +218,7 @@ public class ReceiptHandleGroup {
}
res.set(handleData.messageReceiptHandle);
} finally {
- handleData.unlock();
+ handleData.unlock(lockTimeMs);
}
return handleData;
});
@@ -200,7 +233,8 @@ public class ReceiptHandleGroup {
long timeout =
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
handleMap.computeIfPresent(new HandleKey(handle), (handleKey,
handleData) -> {
- if (!handleData.lock(timeout)) {
+ Long lockTimeMs = handleData.lock(timeout);
+ if (lockTimeMs == null) {
throw new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to remove and get
handle failed");
}
try {
@@ -210,7 +244,7 @@ public class ReceiptHandleGroup {
}
return null;
} finally {
- handleData.unlock();
+ handleData.unlock(lockTimeMs);
}
});
removeHandleMapKeyIfNeed(msgID);
@@ -240,7 +274,8 @@ public class ReceiptHandleGroup {
}
long timeout =
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
handleMap.computeIfPresent(new HandleKey(handle), (handleKey,
handleData) -> {
- if (!handleData.lock(timeout)) {
+ Long lockTimeMs = handleData.lock(timeout);
+ if (lockTimeMs == null) {
throw new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to compute
failed");
}
CompletableFuture<MessageReceiptHandle> future =
function.apply(handleData.messageReceiptHandle);
@@ -255,7 +290,7 @@ public class ReceiptHandleGroup {
handleData.messageReceiptHandle = messageReceiptHandle;
}
} finally {
- handleData.unlock();
+ handleData.unlock(lockTimeMs);
}
if (handleData.needRemove) {
handleMap.remove(handleKey, handleData);