This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e75554d5a8 [ISSUE #8804] clean offset when remove group offset
e75554d5a8 is described below
commit e75554d5a8b7708d5a8a5ae9bd723b614f8adf7c
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Thu Oct 10 09:53:06 2024 +0800
[ISSUE #8804] clean offset when remove group offset
---
.../broker/offset/LmqConsumerOffsetManager.java | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
index ce70b1a820..53e9e2e063 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.offset;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -110,4 +111,25 @@ public class LmqConsumerOffsetManager extends
ConsumerOffsetManager {
public void setLmqOffsetTable(ConcurrentHashMap<String, Long>
lmqOffsetTable) {
this.lmqOffsetTable = lmqOffsetTable;
}
+
+ @Override
+ public void removeOffset(String group) {
+ if (!MixAll.isLmq(group)) {
+ super.removeOffset(group);
+ return;
+ }
+ Iterator<Map.Entry<String, Long>> it =
this.lmqOffsetTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, Long> next = it.next();
+ String topicAtGroup = next.getKey();
+ if (topicAtGroup.contains(group)) {
+ String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+ if (arrays.length == 2 && group.equals(arrays[1])) {
+ it.remove();
+ removeConsumerOffset(topicAtGroup);
+ LOG.warn("clean lmq group offset {}", topicAtGroup);
+ }
+ }
+ }
+ }
}