This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1a15729ca9 [ISSUE #8601]When isPopShouldStop hit,unlock
queueLockManager (#8602)
1a15729ca9 is described below
commit 1a15729ca962d76ffe044f6332ec711b1d7546bc
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Fri Aug 30 13:43:01 2024 +0800
[ISSUE #8601]When isPopShouldStop hit,unlock queueLockManager (#8602)
* fix:when isPopShouldStop hit, unlock queueLockManager
* fix:when isPopShouldStop hit, unlock queueLockManager
* fix: limit rate of appending commit in case of DLedger commit-log
Signed-off-by: Zhanhui Li <[email protected]>
---------
Signed-off-by: Zhanhui Li <[email protected]>
Co-authored-by: Zhanhui Li <[email protected]>
---
.../org/apache/rocketmq/broker/processor/PopMessageProcessor.java | 2 +-
.../org/apache/rocketmq/store/dledger/MessageStoreTestBase.java | 7 +++++++
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 47ef8e4013..5430fdec94 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -540,6 +540,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return future;
}
+ future.whenComplete((result, throwable) ->
queueLockManager.unLock(lockKey));
if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId))
{
POP_LOGGER.warn("Too much msgs unacked, then stop poping.
topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(),
queueId);
restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) -
offset + restNum;
@@ -548,7 +549,6 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
try {
- future.whenComplete((result, throwable) ->
queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInitMode(),
true, lockKey, true);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index a21806ffcf..c4d9f0727b 100644
---
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.dledger;
+import com.google.common.util.concurrent.RateLimiter;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File;
@@ -122,7 +123,13 @@ public class MessageStoreTestBase extends StoreTestBase {
}
protected void doPutMessages(MessageStore messageStore, String topic, int
queueId, int num, long beginLogicsOffset) throws UnknownHostException {
+ RateLimiter rateLimiter = RateLimiter.create(100);
+ MessageStoreConfig storeConfig = messageStore.getMessageStoreConfig();
+ boolean limitAppendRate = storeConfig.isEnableDLegerCommitLog();
for (int i = 0; i < num; i++) {
+ if (limitAppendRate) {
+ rateLimiter.acquire();
+ }
MessageExtBrokerInner msgInner = buildMessage();
msgInner.setTopic(topic);
msgInner.setQueueId(queueId);