This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new edaffe6906 [ISSUE #9695] Not use pull offset when use pop orderly 
consume (#9696)
edaffe6906 is described below

commit edaffe69060bfd95f63e291308d28ce03738a1fb
Author: lizhimins <[email protected]>
AuthorDate: Fri Sep 12 17:26:16 2025 +0800

    [ISSUE #9695] Not use pull offset when use pop orderly consume (#9696)
---
 .../org/apache/rocketmq/broker/pop/PopConsumerService.java   | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 1138ff4afe..277a4999cf 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -199,8 +199,14 @@ public class PopConsumerService extends ServiceThread {
         return context;
     }
 
-    public long getPopOffset(String groupId, String topicId, int queueId, int 
initMode) {
-        long offset = 
this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, 
topicId, queueId);
+    public long getPopOffset(String groupId, String topicId, int queueId, int 
initMode, boolean fifo) {
+
+        // For FIFO messages, the pull offset is not used.
+        // This preserves compatibility when switching from pull consumer to 
pop consumer.
+        long offset = fifo ?
+            
this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, 
queueId) :
+            
this.brokerController.getConsumerOffsetManager().queryPullOffset(groupId, 
topicId, queueId);
+
         if (offset < 0L) {
             try {
                 offset = this.brokerController.getPopMessageProcessor()
@@ -309,7 +315,7 @@ public class PopConsumerService extends ServiceThread {
                 result.addRestCount(this.getPendingFilterCount(groupId, 
topicId, queueId));
                 return CompletableFuture.completedFuture(result);
             } else {
-                final long consumeOffset = this.getPopOffset(groupId, topicId, 
queueId, result.getInitMode());
+                final long consumeOffset = this.getPopOffset(groupId, topicId, 
queueId, result.getInitMode(), result.isFifo());
                 return getMessageAsync(clientHost, groupId, topicId, queueId, 
consumeOffset, remain, filter)
                     .thenApply(getMessageResult -> handleGetMessageResult(
                         result, getMessageResult, topicId, queueId, retryType, 
consumeOffset));

Reply via email to