This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d9ebe88767 [ISSUE #8268] Fix pop orderly commitOffset when 
NO_MATCHED_MESSAGE (#8270)
d9ebe88767 is described below

commit d9ebe887677646ec8e969c03ea0d394a299894e0
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Thu Jun 6 20:29:59 2024 +0800

    [ISSUE #8268] Fix pop orderly commitOffset when NO_MATCHED_MESSAGE (#8270)
---
 .../apache/rocketmq/broker/processor/PopMessageProcessor.java | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 3df4bec984..0304a5dab0 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -638,10 +638,13 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                     || 
GetMessageStatus.MESSAGE_WAS_REMOVING.equals(result.getStatus())
                     || 
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.equals(result.getStatus()))
                     && result.getNextBeginOffset() > -1) {
-                    
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, 
queueId, finalOffset,
-                        requestHeader.getInvisibleTime(), popTime, reviveQid, 
result.getNextBeginOffset(), 
brokerController.getBrokerConfig().getBrokerName());
-//                
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
 requestHeader.getConsumerGroup(), topic,
-//                        queueId, getMessageTmpResult.getNextBeginOffset());
+                    if (isOrder) {
+                        
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
 requestHeader.getConsumerGroup(), topic,
+                            queueId, result.getNextBeginOffset());
+                    } else {
+                        
popBufferMergeService.addCkMock(requestHeader.getConsumerGroup(), topic, 
queueId, finalOffset,
+                            requestHeader.getInvisibleTime(), popTime, 
reviveQid, result.getNextBeginOffset(), 
brokerController.getBrokerConfig().getBrokerName());
+                    }
                 }
 
                 atomicRestNum.set(result.getMaxOffset() - 
result.getNextBeginOffset() + atomicRestNum.get());

Reply via email to