This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5c34a59cd9 [ISSUE #9184] Optimize QueueLockManager#tryLock method
(#9185)
5c34a59cd9 is described below
commit 5c34a59cd90074a544525de6609f8ef400dfe2b3
Author: mxsm <[email protected]>
AuthorDate: Fri Mar 7 13:46:52 2025 +0800
[ISSUE #9184] Optimize QueueLockManager#tryLock method (#9185)
---
.../broker/processor/PopMessageProcessor.java | 40 ++++++++--------------
1 file changed, 14 insertions(+), 26 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 9355af319e..b84afe2194 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -64,6 +64,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCallback;
@@ -150,11 +151,11 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) {
return batchAckMsg.getTopic()
- + PopAckConstants.SPLIT + batchAckMsg.getQueueId()
- + PopAckConstants.SPLIT +
batchAckMsg.getAckOffsetList().toString()
- + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
- + PopAckConstants.SPLIT + batchAckMsg.getPopTime()
- + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
+ + PopAckConstants.SPLIT + batchAckMsg.getQueueId()
+ + PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString()
+ + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
+ + PopAckConstants.SPLIT + batchAckMsg.getPopTime()
+ + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
}
public static String genCkUniqueId(PopCheckPoint ck) {
@@ -861,7 +862,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
private boolean isPopShouldStop(String topic, String group, int queueId) {
return
brokerController.getBrokerConfig().isEnablePopMessageThreshold() &&
-
brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic,
group, queueId) >
brokerController.getBrokerConfig().getPopInflightMessageThreshold();
+
brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic,
group, queueId) >
brokerController.getBrokerConfig().getPopInflightMessageThreshold();
}
private long getPopOffset(String topic, String group, int queueId, int
initMode, boolean init, String lockKey,
@@ -908,7 +909,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
if (init) { // whichever initMode
this.brokerController.getConsumerOffsetManager().commitOffset(
- "getPopOffset", group, topic, queueId, offset);
+ "getPopOffset", group, topic, queueId, offset);
}
return offset;
}
@@ -1002,12 +1003,13 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
private volatile long lockTime;
public TimedLock() {
- this.lock = new AtomicBoolean(true);
+ // init lock status, false means not locked
+ this.lock = new AtomicBoolean(false);
this.lockTime = System.currentTimeMillis();
}
public boolean tryLock() {
- boolean ret = lock.compareAndSet(true, false);
+ boolean ret = lock.compareAndSet(false, true);
if (ret) {
this.lockTime = System.currentTimeMillis();
return true;
@@ -1017,11 +1019,11 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
public void unLock() {
- lock.set(true);
+ lock.set(false);
}
public boolean isLock() {
- return !lock.get();
+ return lock.get();
}
public long getLockTime() {
@@ -1041,21 +1043,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
public boolean tryLock(String key) {
- TimedLock timedLock = expiredLocalCache.get(key);
-
- if (timedLock == null) {
- TimedLock old = expiredLocalCache.putIfAbsent(key, new
TimedLock());
- if (old != null) {
- return false;
- } else {
- timedLock = expiredLocalCache.get(key);
- }
- }
-
- if (timedLock == null) {
- return false;
- }
-
+ TimedLock timedLock =
ConcurrentHashMapUtils.computeIfAbsent(expiredLocalCache, key, k -> new
TimedLock());
return timedLock.tryLock();
}