This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e1007439e5 [ISSUE #10181] Some minor fixes in PopConsumerService
(#10182)
e1007439e5 is described below
commit e1007439e565b215dd6ed9dc3f945b7d6f139383
Author: lizhimins <[email protected]>
AuthorDate: Mon Mar 23 11:39:52 2026 +0800
[ISSUE #10181] Some minor fixes in PopConsumerService (#10182)
---
.../org/apache/rocketmq/broker/pop/PopConsumerService.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index d76651643d..e13a81b144 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -236,8 +236,8 @@ public class PopConsumerService extends ServiceThread {
public CompletableFuture<GetMessageResult> getMessageAsync(String
clientHost,
String groupId, String topicId, int queueId, long offset, int
batchSize, MessageFilter filter) {
- log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={},
queueId={}, offset={}, batchSize={}, filter={}",
- groupId, topicId, offset, queueId, batchSize, filter != null);
+ log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={},
queueId={}, " +
+ "offset={}, batchSize={}, filter={}", groupId, topicId, queueId,
offset, batchSize, filter != null);
CompletableFuture<GetMessageResult> getMessageFuture =
brokerController.getMessageStore().getMessageAsync(groupId,
topicId, queueId, offset, batchSize, filter);
@@ -552,7 +552,7 @@ public class PopConsumerService extends ServiceThread {
@SuppressWarnings("StatementWithEmptyBody")
public void clearCache(String groupId, String topicId, int queueId) {
- while (consumerLockService.tryLock(groupId, topicId)) {
+ while (!consumerLockService.tryLock(groupId, topicId)) {
}
try {
if (popConsumerCache != null) {
@@ -592,7 +592,7 @@ public class PopConsumerService extends ServiceThread {
if (!result) {
if (record.getAttemptTimes() <
brokerConfig.getPopReviveMaxAttemptTimes()) {
long backoffInterval = 1000L *
REWRITE_INTERVALS_IN_SECONDS[
- Math.min(REWRITE_INTERVALS_IN_SECONDS.length,
record.getAttemptTimes())];
+ Math.min(REWRITE_INTERVALS_IN_SECONDS.length - 1,
record.getAttemptTimes())];
long nextInvisibleTime = record.getInvisibleTime() +
backoffInterval;
PopConsumerRecord retryRecord = new
PopConsumerRecord(System.currentTimeMillis(),
record.getGroupId(), record.getTopicId(),
record.getQueueId(),
@@ -760,7 +760,7 @@ public class PopConsumerService extends ServiceThread {
ck.setQueueId(record.getQueueId());
ck.setBrokerName(brokerConfig.getBrokerName());
ck.addDiff(0);
- ck.setRePutTimes(ck.getRePutTimes());
+ ck.setRePutTimes(String.valueOf(record.getAttemptTimes()));
int reviveQueueId = (int) record.getOffset() %
brokerConfig.getReviveQueueNum();
MessageExtBrokerInner ckMsg =
brokerController.getPopMessageProcessor().buildCkMsg(ck, reviveQueueId);