This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 348f58a5b [ISSUE #5813] Fix topic queue lock not unlock in pop 
consumption mode (#5815)
348f58a5b is described below

commit 348f58a5b1e582386c7d734064771356d07185bd
Author: SSpirits <[email protected]>
AuthorDate: Wed Jan 4 15:06:23 2023 +0800

    [ISSUE #5813] Fix topic queue lock not unlock in pop consumption mode 
(#5815)
---
 .../rocketmq/broker/processor/PopMessageProcessor.java    | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6fe7b6782..0f4de599a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -525,17 +525,21 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         boolean isOrder = requestHeader.isOrder();
         long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId, requestHeader.getInitMode(),
             false, lockKey, false);
+        CompletableFuture<Long> future = new CompletableFuture<>();
         if (!queueLockManager.tryLock(lockKey)) {
             restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset + restNum;
-            return CompletableFuture.completedFuture(restNum);
+            future.complete(restNum);
+            return future;
         }
 
         try {
+            future.whenComplete((result, throwable) -> 
queueLockManager.unLock(lockKey));
             offset = getPopOffset(topic, requestHeader.getConsumerGroup(), 
queueId, requestHeader.getInitMode(),
                 true, lockKey, true);
             if (isOrder && 
brokerController.getConsumerOrderInfoManager().checkBlock(topic,
                 requestHeader.getConsumerGroup(), queueId, 
requestHeader.getInvisibleTime())) {
-                return 
CompletableFuture.completedFuture(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
 queueId) - offset + restNum);
+                
future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic,
 queueId) - offset + restNum);
+                return future;
             }
 
             if (isOrder) {
@@ -548,12 +552,13 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
 
             if (getMessageResult.getMessageMapedList().size() >= 
requestHeader.getMaxMsgNums()) {
                 restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset + restNum;
-                return CompletableFuture.completedFuture(restNum);
+                future.complete(restNum);
+                return future;
             }
         } catch (Exception e) {
             POP_LOGGER.error("Exception in popMsgFromQueue", e);
-            queueLockManager.unLock(lockKey);
-            return CompletableFuture.completedFuture(restNum);
+            future.complete(restNum);
+            return future;
         }
 
         AtomicLong atomicRestNum = new AtomicLong(restNum);

Reply via email to