This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 61ea51252c [ISSUE #9172] Clean pull offset and reset offset when
delete subscription group (#9173)
61ea51252c is described below
commit 61ea51252c7e2e8c9ce2f53702f8c66356501ada
Author: lizhimins <[email protected]>
AuthorDate: Wed Feb 12 19:09:55 2025 +0800
[ISSUE #9172] Clean pull offset and reset offset when delete subscription
group (#9173)
* [ISSUE #9172] Clean pull offset and reset offset when delete subscription
group
* [ISSUE #9174] Add a collection of predefined Groups and common checking
methods in the MixAll (#9175)
Signed-off-by: ltamber <[email protected]>
* [ISSUE #9177] Fix unstable tests in AdaptiveLockTest.testAdaptiveLock
(#9178)
---------
Signed-off-by: ltamber <[email protected]>
Co-authored-by: ltamber <[email protected]>
Co-authored-by: hqbfz <[email protected]>
---
.../broker/offset/ConsumerOffsetManager.java | 33 ++++++++++++++--------
.../broker/offset/ConsumerOffsetManagerTest.java | 14 +++++++++
2 files changed, 36 insertions(+), 11 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 85bc8e3789..eafb47a89d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Strings;
+import java.util.function.Function;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -395,19 +396,29 @@ public class ConsumerOffsetManager extends ConfigManager {
}
public void removeOffset(final String group) {
- Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it =
this.offsetTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
- String topicAtGroup = next.getKey();
- if (topicAtGroup.contains(group)) {
- String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
- if (arrays.length == 2 && group.equals(arrays[1])) {
- it.remove();
- removeConsumerOffset(topicAtGroup);
- LOG.warn("clean group offset {}", topicAtGroup);
+ Function<Iterator<Entry<String, ConcurrentMap<Integer, Long>>>,
Boolean> deleteFunction = it -> {
+ boolean removed = false;
+ while (it.hasNext()) {
+ Entry<String, ConcurrentMap<Integer, Long>> entry = it.next();
+ String topicAtGroup = entry.getKey();
+ if (topicAtGroup.contains(group)) {
+ String[] arrays =
topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+ if (arrays.length == 2 && group.equals(arrays[1])) {
+ it.remove();
+ removeConsumerOffset(topicAtGroup);
+ removed = true;
+ }
}
}
- }
+ return removed;
+ };
+
+ boolean clearOffset =
deleteFunction.apply(this.offsetTable.entrySet().iterator());
+ boolean clearReset =
deleteFunction.apply(this.resetOffsetTable.entrySet().iterator());
+ boolean clearPull =
deleteFunction.apply(this.pullOffsetTable.entrySet().iterator());
+
+ LOG.info("Consumer offset manager clean group offset, groupName={}, " +
+ "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group,
clearOffset, clearReset, clearPull);
}
public void assignResetOffset(String topic, String group, int queueId,
long offset) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index 7bd289a6f1..9fc553409d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.broker.offset;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Assert;
import org.junit.Before;
@@ -27,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.mockito.Mockito;
+import static
org.apache.rocketmq.broker.offset.ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumerOffsetManagerTest {
@@ -65,6 +67,18 @@ public class ConsumerOffsetManagerTest {
assertThat(!consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue();
}
+ @Test
+ public void removeOffsetByGroupTest() {
+ String topic = "TopicName";
+ String group = "GroupName";
+ Mockito.when(brokerController.getBrokerConfig()).thenReturn(new
BrokerConfig());
+ consumerOffsetManager.commitOffset("Commit", group, topic, 0, 100);
+ consumerOffsetManager.assignResetOffset(topic, group, 0, 100);
+ consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100);
+ consumerOffsetManager.removeOffset(group);
+
Assert.assertFalse(consumerOffsetManager.getOffsetTable().containsKey(topic +
TOPIC_GROUP_SEPARATOR + group));
+ }
+
@Test
public void testOffsetPersistInMemory() {
ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable =
consumerOffsetManager.getOffsetTable();