This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2eb537cdaa [ISSUE #9980] Skip invalid Pop records when consumer group 
does not exist (#9982)
2eb537cdaa is described below

commit 2eb537cdaa4d41c24897c374aff2920861a62e87
Author: Aman Gautam <[email protected]>
AuthorDate: Tue Jan 20 11:32:15 2026 +0530

    [ISSUE #9980] Skip invalid Pop records when consumer group does not exist 
(#9982)
    
    Signed-off-by: Aman Gautam <[email protected]>
---
 .../broker/processor/PopBufferMergeService.java      | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 06d89e047d..5373eaea33 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -216,6 +216,13 @@ public class PopBufferMergeService extends ServiceThread {
         }
     }
 
+    private boolean isSubscriptionGroupNotExist(PopCheckPointWrapper 
pointWrapper) {
+        String group = pointWrapper.getCk().getCId();
+        return brokerController.getSubscriptionGroupManager()
+                .findSubscriptionGroupConfig(group) == null;
+    }
+
+
     private void scan() {
         long startTime = System.currentTimeMillis();
         AtomicInteger count = new AtomicInteger(0);
@@ -225,6 +232,19 @@ public class PopBufferMergeService extends ServiceThread {
             Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
             PopCheckPointWrapper pointWrapper = entry.getValue();
 
+            // Skip invalid POP records when consumer group does not exist
+            if (isSubscriptionGroupNotExist(pointWrapper)) {
+                POP_LOGGER.warn(
+                        "[PopBuffer] skip pop record because consumer group 
not exist, group={}, ck={}",
+                        pointWrapper.getCk().getCId(),
+                        pointWrapper
+                );
+                iterator.remove();
+                counter.decrementAndGet();
+                continue;
+            }
+
+
             // just process offset(already stored at pull thread), or buffer 
ck(not stored and ack finish)
             if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || 
isCkDone(pointWrapper)
                 || isCkDoneForFinish(pointWrapper) && 
pointWrapper.isCkStored()) {

Reply via email to