This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 854369aa38 [ISSUE #9339] Fix pop update consumption offset when 
message are filtered (#9340)
854369aa38 is described below

commit 854369aa38ce574ccf931b99bdb799a6fb533183
Author: lizhimins <[email protected]>
AuthorDate: Wed Apr 16 16:00:37 2025 +0800

    [ISSUE #9339] Fix pop update consumption offset when message are filtered 
(#9340)
---
 .../rocketmq/broker/pop/PopConsumerService.java    | 23 +++++++++++-----------
 .../broker/processor/PopMessageProcessor.java      |  4 +++-
 .../broker/pop/PopConsumerServiceTest.java         |  2 +-
 .../apache/rocketmq/store/DefaultMessageStore.java |  2 +-
 4 files changed, 17 insertions(+), 14 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index dde13a5ed7..1138ff4afe 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -162,17 +162,15 @@ public class PopConsumerService extends ServiceThread {
         return result;
     }
 
-    public PopConsumerContext addGetMessageResult(PopConsumerContext context, 
GetMessageResult result,
+    public PopConsumerContext handleGetMessageResult(PopConsumerContext 
context, GetMessageResult result,
         String topicId, int queueId, PopConsumerRecord.RetryType retryType, 
long offset) {
 
-        if (result.getStatus() == GetMessageStatus.FOUND && 
!result.getMessageQueueOffset().isEmpty()) {
+        if (GetMessageStatus.FOUND.equals(result.getStatus()) && 
!result.getMessageQueueOffset().isEmpty()) {
             if (context.isFifo()) {
                 this.setFifoBlocked(context, context.getGroupId(), topicId, 
queueId, result.getMessageQueueOffset());
             }
-
-            // build request header here
+            // build response header here
             context.addGetMessageResult(result, topicId, queueId, retryType, 
offset);
-
             if (brokerConfig.isPopConsumerKVServiceLog()) {
                 log.info("PopConsumerService pop, time={}, invisible={}, " +
                         "groupId={}, topic={}, queueId={}, offset={}, 
attemptId={}",
@@ -181,20 +179,23 @@ public class PopConsumerService extends ServiceThread {
             }
         }
 
-        if (!context.isFifo() && result.getNextBeginOffset() > 
OFFSET_NOT_EXIST) {
+        long commitOffset = offset;
+        if (context.isFifo()) {
+            if (!GetMessageStatus.FOUND.equals(result.getStatus())) {
+                commitOffset = result.getNextBeginOffset();
+            }
+        } else {
             this.brokerController.getConsumerOffsetManager().commitPullOffset(
                 context.getClientHost(), context.getGroupId(), topicId, 
queueId, result.getNextBeginOffset());
-            long commitOffset = result.getStatus() == GetMessageStatus.FOUND ? 
offset : result.getNextBeginOffset();
             if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != 
null) {
                 long minOffset = 
popConsumerCache.getMinOffsetInCache(context.getGroupId(), topicId, queueId);
                 if (minOffset != OFFSET_NOT_EXIST) {
                     commitOffset = minOffset;
                 }
             }
-            this.brokerController.getConsumerOffsetManager().commitOffset(
-                context.getClientHost(), context.getGroupId(), topicId, 
queueId, commitOffset);
         }
-
+        this.brokerController.getConsumerOffsetManager().commitOffset(
+            context.getClientHost(), context.getGroupId(), topicId, queueId, 
commitOffset);
         return context;
     }
 
@@ -310,7 +311,7 @@ public class PopConsumerService extends ServiceThread {
             } else {
                 final long consumeOffset = this.getPopOffset(groupId, topicId, 
queueId, result.getInitMode());
                 return getMessageAsync(clientHost, groupId, topicId, queueId, 
consumeOffset, remain, filter)
-                    .thenApply(getMessageResult -> addGetMessageResult(
+                    .thenApply(getMessageResult -> handleGetMessageResult(
                         result, getMessageResult, topicId, queueId, retryType, 
consumeOffset));
             }
         });
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index dd8314b7e0..d73acc84df 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -680,7 +680,9 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         CompletableFuture<Long> future = new CompletableFuture<>();
         if (!queueLockManager.tryLock(lockKey)) {
             try {
-                restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset + restNum;
+                if (!requestHeader.isOrder()) {
+                    restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 
offset + restNum;
+                }
                 future.complete(restNum);
             } catch (ConsumeQueueException e) {
                 future.completeExceptionally(e);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 7fb619f740..9c23a8625e 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -195,7 +195,7 @@ public class PopConsumerServiceTest {
         GetMessageResult result = new GetMessageResult();
         result.setStatus(GetMessageStatus.FOUND);
         result.getMessageQueueOffset().add(100L);
-        consumerService.addGetMessageResult(
+        consumerService.handleGetMessageResult(
             context, result, topicId, queueId, 
PopConsumerRecord.RetryType.NORMAL_TOPIC, 100);
         Assert.assertEquals(1, context.getGetMessageResultList().size());
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index fc6bc4213a..360523be85 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -875,7 +875,7 @@ public class DefaultMessageStore implements MessageStore {
 
                             boolean isInMem = 
estimateInMemByCommitOffset(offsetPy, maxOffsetPy);
 
-                            if ((cqUnit.getQueueOffset() - offset) * 
consumeQueue.getUnitSize() > maxFilterMessageSize) {
+                            if ((cqUnit.getQueueOffset() - offset) * 
consumeQueue.getUnitSize() >= maxFilterMessageSize) {
                                 break;
                             }
 

Reply via email to