This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2eb537cdaa [ISSUE #9980] Skip invalid Pop records when consumer group
does not exist (#9982)
2eb537cdaa is described below
commit 2eb537cdaa4d41c24897c374aff2920861a62e87
Author: Aman Gautam <[email protected]>
AuthorDate: Tue Jan 20 11:32:15 2026 +0530
[ISSUE #9980] Skip invalid Pop records when consumer group does not exist
(#9982)
Signed-off-by: Aman Gautam <[email protected]>
---
.../broker/processor/PopBufferMergeService.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 06d89e047d..5373eaea33 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -216,6 +216,13 @@ public class PopBufferMergeService extends ServiceThread {
}
}
+ private boolean isSubscriptionGroupNotExist(PopCheckPointWrapper
pointWrapper) {
+ String group = pointWrapper.getCk().getCId();
+ return brokerController.getSubscriptionGroupManager()
+ .findSubscriptionGroupConfig(group) == null;
+ }
+
+
private void scan() {
long startTime = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
@@ -225,6 +232,19 @@ public class PopBufferMergeService extends ServiceThread {
Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();
PopCheckPointWrapper pointWrapper = entry.getValue();
+ // Skip invalid POP records when consumer group does not exist
+ if (isSubscriptionGroupNotExist(pointWrapper)) {
+ POP_LOGGER.warn(
+ "[PopBuffer] skip pop record because consumer group
not exist, group={}, ck={}",
+ pointWrapper.getCk().getCId(),
+ pointWrapper
+ );
+ iterator.remove();
+ counter.decrementAndGet();
+ continue;
+ }
+
+
// just process offset(already stored at pull thread), or buffer
ck(not stored and ack finish)
if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() ||
isCkDone(pointWrapper)
|| isCkDoneForFinish(pointWrapper) &&
pointWrapper.isCkStored()) {