This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d9ebe88767 [ISSUE #8268] Fix pop orderly commitOffset when
NO_MATCHED_MESSAGE (#8270)
d9ebe88767 is described below
commit d9ebe887677646ec8e969c03ea0d394a299894e0
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Thu Jun 6 20:29:59 2024 +0800
[ISSUE #8268] Fix pop orderly commitOffset when NO_MATCHED_MESSAGE (#8270)
---
.../apache/rocketmq/broker/processor/PopMessageProcessor.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 3df4bec984..0304a5dab0 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -638,10 +638,13 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
||
GetMessageStatus.MESSAGE_WAS_REMOVING.equals(result.getStatus())
||
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(result.getStatus()))
&& result.getNextBeginOffset() > -1) {
-
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic,
queueId, finalOffset,
- requestHeader.getInvisibleTime(), popTime, reviveQid,
result.getNextBeginOffset(),
brokerController.getBrokerConfig().getBrokerName());
-//
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic,
-// queueId, getMessageTmpResult.getNextBeginOffset());
+ if (isOrder) {
+
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic,
+ queueId, result.getNextBeginOffset());
+ } else {
+
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic,
queueId, finalOffset,
+ requestHeader.getInvisibleTime(), popTime,
reviveQid, result.getNextBeginOffset(),
brokerController.getBrokerConfig().getBrokerName());
+ }
}
atomicRestNum.set(result.getMaxOffset() -
result.getNextBeginOffset() + atomicRestNum.get());