This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 854369aa38 [ISSUE #9339] Fix pop update consumption offset when
message are filtered (#9340)
854369aa38 is described below
commit 854369aa38ce574ccf931b99bdb799a6fb533183
Author: lizhimins <[email protected]>
AuthorDate: Wed Apr 16 16:00:37 2025 +0800
[ISSUE #9339] Fix pop update consumption offset when message are filtered
(#9340)
---
.../rocketmq/broker/pop/PopConsumerService.java | 23 +++++++++++-----------
.../broker/processor/PopMessageProcessor.java | 4 +++-
.../broker/pop/PopConsumerServiceTest.java | 2 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 2 +-
4 files changed, 17 insertions(+), 14 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index dde13a5ed7..1138ff4afe 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -162,17 +162,15 @@ public class PopConsumerService extends ServiceThread {
return result;
}
- public PopConsumerContext addGetMessageResult(PopConsumerContext context,
GetMessageResult result,
+ public PopConsumerContext handleGetMessageResult(PopConsumerContext
context, GetMessageResult result,
String topicId, int queueId, PopConsumerRecord.RetryType retryType,
long offset) {
- if (result.getStatus() == GetMessageStatus.FOUND &&
!result.getMessageQueueOffset().isEmpty()) {
+ if (GetMessageStatus.FOUND.equals(result.getStatus()) &&
!result.getMessageQueueOffset().isEmpty()) {
if (context.isFifo()) {
this.setFifoBlocked(context, context.getGroupId(), topicId,
queueId, result.getMessageQueueOffset());
}
-
- // build request header here
+ // build response header here
context.addGetMessageResult(result, topicId, queueId, retryType,
offset);
-
if (brokerConfig.isPopConsumerKVServiceLog()) {
log.info("PopConsumerService pop, time={}, invisible={}, " +
"groupId={}, topic={}, queueId={}, offset={},
attemptId={}",
@@ -181,20 +179,23 @@ public class PopConsumerService extends ServiceThread {
}
}
- if (!context.isFifo() && result.getNextBeginOffset() >
OFFSET_NOT_EXIST) {
+ long commitOffset = offset;
+ if (context.isFifo()) {
+ if (!GetMessageStatus.FOUND.equals(result.getStatus())) {
+ commitOffset = result.getNextBeginOffset();
+ }
+ } else {
this.brokerController.getConsumerOffsetManager().commitPullOffset(
context.getClientHost(), context.getGroupId(), topicId,
queueId, result.getNextBeginOffset());
- long commitOffset = result.getStatus() == GetMessageStatus.FOUND ?
offset : result.getNextBeginOffset();
if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache !=
null) {
long minOffset =
popConsumerCache.getMinOffsetInCache(context.getGroupId(), topicId, queueId);
if (minOffset != OFFSET_NOT_EXIST) {
commitOffset = minOffset;
}
}
- this.brokerController.getConsumerOffsetManager().commitOffset(
- context.getClientHost(), context.getGroupId(), topicId,
queueId, commitOffset);
}
-
+ this.brokerController.getConsumerOffsetManager().commitOffset(
+ context.getClientHost(), context.getGroupId(), topicId, queueId,
commitOffset);
return context;
}
@@ -310,7 +311,7 @@ public class PopConsumerService extends ServiceThread {
} else {
final long consumeOffset = this.getPopOffset(groupId, topicId,
queueId, result.getInitMode());
return getMessageAsync(clientHost, groupId, topicId, queueId,
consumeOffset, remain, filter)
- .thenApply(getMessageResult -> addGetMessageResult(
+ .thenApply(getMessageResult -> handleGetMessageResult(
result, getMessageResult, topicId, queueId, retryType,
consumeOffset));
}
});
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index dd8314b7e0..d73acc84df 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -680,7 +680,9 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
CompletableFuture<Long> future = new CompletableFuture<>();
if (!queueLockManager.tryLock(lockKey)) {
try {
- restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) -
offset + restNum;
+ if (!requestHeader.isOrder()) {
+ restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) -
offset + restNum;
+ }
future.complete(restNum);
} catch (ConsumeQueueException e) {
future.completeExceptionally(e);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 7fb619f740..9c23a8625e 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -195,7 +195,7 @@ public class PopConsumerServiceTest {
GetMessageResult result = new GetMessageResult();
result.setStatus(GetMessageStatus.FOUND);
result.getMessageQueueOffset().add(100L);
- consumerService.addGetMessageResult(
+ consumerService.handleGetMessageResult(
context, result, topicId, queueId,
PopConsumerRecord.RetryType.NORMAL_TOPIC, 100);
Assert.assertEquals(1, context.getGetMessageResultList().size());
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index fc6bc4213a..360523be85 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -875,7 +875,7 @@ public class DefaultMessageStore implements MessageStore {
boolean isInMem =
estimateInMemByCommitOffset(offsetPy, maxOffsetPy);
- if ((cqUnit.getQueueOffset() - offset) *
consumeQueue.getUnitSize() > maxFilterMessageSize) {
+ if ((cqUnit.getQueueOffset() - offset) *
consumeQueue.getUnitSize() >= maxFilterMessageSize) {
break;
}