This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e1007439e5 [ISSUE #10181] Some minor fixes in PopConsumerService 
(#10182)
e1007439e5 is described below

commit e1007439e565b215dd6ed9dc3f945b7d6f139383
Author: lizhimins <[email protected]>
AuthorDate: Mon Mar 23 11:39:52 2026 +0800

    [ISSUE #10181] Some minor fixes in PopConsumerService (#10182)
---
 .../org/apache/rocketmq/broker/pop/PopConsumerService.java     | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index d76651643d..e13a81b144 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -236,8 +236,8 @@ public class PopConsumerService extends ServiceThread {
     public CompletableFuture<GetMessageResult> getMessageAsync(String 
clientHost,
         String groupId, String topicId, int queueId, long offset, int 
batchSize, MessageFilter filter) {
 
-        log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, 
queueId={}, offset={}, batchSize={}, filter={}",
-            groupId, topicId, offset, queueId, batchSize, filter != null);
+        log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, 
queueId={}, " +
+            "offset={}, batchSize={}, filter={}", groupId, topicId, queueId, 
offset, batchSize, filter != null);
 
         CompletableFuture<GetMessageResult> getMessageFuture =
             brokerController.getMessageStore().getMessageAsync(groupId, 
topicId, queueId, offset, batchSize, filter);
@@ -552,7 +552,7 @@ public class PopConsumerService extends ServiceThread {
 
     @SuppressWarnings("StatementWithEmptyBody")
     public void clearCache(String groupId, String topicId, int queueId) {
-        while (consumerLockService.tryLock(groupId, topicId)) {
+        while (!consumerLockService.tryLock(groupId, topicId)) {
         }
         try {
             if (popConsumerCache != null) {
@@ -592,7 +592,7 @@ public class PopConsumerService extends ServiceThread {
                 if (!result) {
                     if (record.getAttemptTimes() < 
brokerConfig.getPopReviveMaxAttemptTimes()) {
                         long backoffInterval = 1000L * 
REWRITE_INTERVALS_IN_SECONDS[
-                            Math.min(REWRITE_INTERVALS_IN_SECONDS.length, 
record.getAttemptTimes())];
+                            Math.min(REWRITE_INTERVALS_IN_SECONDS.length - 1, 
record.getAttemptTimes())];
                         long nextInvisibleTime = record.getInvisibleTime() + 
backoffInterval;
                         PopConsumerRecord retryRecord = new 
PopConsumerRecord(System.currentTimeMillis(),
                             record.getGroupId(), record.getTopicId(), 
record.getQueueId(),
@@ -760,7 +760,7 @@ public class PopConsumerService extends ServiceThread {
                     ck.setQueueId(record.getQueueId());
                     ck.setBrokerName(brokerConfig.getBrokerName());
                     ck.addDiff(0);
-                    ck.setRePutTimes(ck.getRePutTimes());
+                    ck.setRePutTimes(String.valueOf(record.getAttemptTimes()));
                     int reviveQueueId = (int) record.getOffset() % 
brokerConfig.getReviveQueueNum();
                     MessageExtBrokerInner ckMsg =
                         
brokerController.getPopMessageProcessor().buildCkMsg(ck, reviveQueueId);

Reply via email to