This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 348f58a5b [ISSUE #5813] Fix topic queue lock not unlock in pop
consumption mode (#5815)
348f58a5b is described below
commit 348f58a5b1e582386c7d734064771356d07185bd
Author: SSpirits <[email protected]>
AuthorDate: Wed Jan 4 15:06:23 2023 +0800
[ISSUE #5813] Fix topic queue lock not unlock in pop consumption mode
(#5815)
---
.../rocketmq/broker/processor/PopMessageProcessor.java | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6fe7b6782..0f4de599a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -525,17 +525,21 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
boolean isOrder = requestHeader.isOrder();
long offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInitMode(),
false, lockKey, false);
+ CompletableFuture<Long> future = new CompletableFuture<>();
if (!queueLockManager.tryLock(lockKey)) {
restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) -
offset + restNum;
- return CompletableFuture.completedFuture(restNum);
+ future.complete(restNum);
+ return future;
}
try {
+ future.whenComplete((result, throwable) ->
queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInitMode(),
true, lockKey, true);
if (isOrder &&
brokerController.getConsumerOrderInfoManager().checkBlock(topic,
requestHeader.getConsumerGroup(), queueId,
requestHeader.getInvisibleTime())) {
- return
CompletableFuture.completedFuture(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
queueId) - offset + restNum);
+
future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
queueId) - offset + restNum);
+ return future;
}
if (isOrder) {
@@ -548,12 +552,13 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
if (getMessageResult.getMessageMapedList().size() >=
requestHeader.getMaxMsgNums()) {
restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) -
offset + restNum;
- return CompletableFuture.completedFuture(restNum);
+ future.complete(restNum);
+ return future;
}
} catch (Exception e) {
POP_LOGGER.error("Exception in popMsgFromQueue", e);
- queueLockManager.unLock(lockKey);
- return CompletableFuture.completedFuture(restNum);
+ future.complete(restNum);
+ return future;
}
AtomicLong atomicRestNum = new AtomicLong(restNum);