This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5b64ffc9e [RIP-48] Not commit offset when broker has server-side
offset in ack message (#5878)
5b64ffc9e is described below
commit 5b64ffc9e86126342d2a441849124098e6e909ef
Author: lizhimins <[email protected]>
AuthorDate: Fri Jan 13 16:45:13 2023 +0800
[RIP-48] Not commit offset when broker has server-side offset in ack
message (#5878)
---
.../rocketmq/broker/processor/AckMessageProcessor.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 80f06aed0..cbe627bdd 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -167,12 +167,13 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
requestHeader.getQueueId(), requestHeader.getOffset(),
ExtraInfoUtil.getPopTime(extraInfo));
if (nextOffset > -1) {
-
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
- requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
- requestHeader.getQueueId(),
- nextOffset);
-
this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(),
requestHeader.getConsumerGroup(),
- requestHeader.getQueueId());
+ if
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
+ requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId())) {
+
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+ requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
+
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
+ requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId());
+ }
} else if (nextOffset == -1) {
String errorInfo = String.format("offset is illegal,
key:%s, old:%d, commit:%d, next:%d, %s",
lockKey, oldOffset, requestHeader.getOffset(),
nextOffset, channel.remoteAddress());