This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5b64ffc9e [RIP-48] Not commit offset when broker has server-side 
offset in ack message (#5878)
5b64ffc9e is described below

commit 5b64ffc9e86126342d2a441849124098e6e909ef
Author: lizhimins <[email protected]>
AuthorDate: Fri Jan 13 16:45:13 2023 +0800

    [RIP-48] Not commit offset when broker has server-side offset in ack 
message (#5878)
---
 .../rocketmq/broker/processor/AckMessageProcessor.java      | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 80f06aed0..cbe627bdd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -167,12 +167,13 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
                     requestHeader.getQueueId(), requestHeader.getOffset(),
                     ExtraInfoUtil.getPopTime(extraInfo));
                 if (nextOffset > -1) {
-                    
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
-                        requestHeader.getConsumerGroup(), 
requestHeader.getTopic(),
-                        requestHeader.getQueueId(),
-                        nextOffset);
-                    
this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(),
 requestHeader.getConsumerGroup(),
-                        requestHeader.getQueueId());
+                    if 
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
+                        requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getQueueId())) {
+                        
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+                            requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
+                        
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
+                            requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getQueueId());
+                    }
                 } else if (nextOffset == -1) {
                     String errorInfo = String.format("offset is illegal, 
key:%s, old:%d, commit:%d, next:%d, %s",
                         lockKey, oldOffset, requestHeader.getOffset(), 
nextOffset, channel.remoteAddress());

Reply via email to