This is an automated email from the ASF dual-hosted git repository.
tianliuliu pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9337904cdf [ISSUE #9309]opti:Avoid the generation of dirty data in
#assignResetOffset (#9310)
9337904cdf is described below
commit 9337904cdf44e9de74204bf1bfb1b5e82a4b7f59
Author: hqbfz <[email protected]>
AuthorDate: Wed Jul 9 09:56:26 2025 +0800
[ISSUE #9309]opti:Avoid the generation of dirty data in #assignResetOffset
(#9310)
* feat: support clients to reset lmq consumption offset
* fix
* fix
* fix
* fix: clean pull offset in #removeOffset
* fix: clean pull offset in #removeOffset
* rerun test
---------
Co-authored-by: hqbfzwang <[email protected]>
---
.../broker/config/v2/ConsumerOffsetManagerV2.java | 26 ++++++++++++++++++++
.../broker/offset/ConsumerOffsetManager.java | 2 +-
.../broker/offset/LmqConsumerOffsetManager.java | 28 ++++++++++++++++++++++
3 files changed, 55 insertions(+), 1 deletion(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
index 28214baf1c..e14ac0bb62 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.config.v2;
+import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
@@ -446,4 +447,29 @@ public class ConsumerOffsetManagerV2 extends
ConsumerOffsetManager {
}
return -1;
}
+
+ @Override
+ public void assignResetOffset(String topic, String group, int queueId,
long offset) {
+ if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) ||
queueId < 0 || offset < 0) {
+ LOG.warn("Illegal arguments when assigning reset offset. Topic={},
group={}, queueId={}, offset={}",
+ topic, group, queueId, offset);
+ return;
+ }
+ if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {
+ super.assignResetOffset(topic, group, queueId, offset);
+ } else {
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+ if (null == map) {
+ map = new ConcurrentHashMap<>();
+ ConcurrentMap<Integer, Long> previous =
resetOffsetTable.putIfAbsent(key, map);
+ if (null != previous) {
+ map = previous;
+ }
+ }
+ map.put(queueId, offset);
+ }
+
+ this.commitOffset(null, topic, group, queueId, offset);
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 140604f521..a6cd9ad987 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -50,7 +50,7 @@ public class ConsumerOffsetManager extends ConfigManager {
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer,
Long>> offsetTable =
new ConcurrentHashMap<>(512);
- private final ConcurrentMap<String, ConcurrentMap<Integer, Long>>
resetOffsetTable =
+ protected final ConcurrentMap<String, ConcurrentMap<Integer, Long>>
resetOffsetTable =
new ConcurrentHashMap<>(512);
private final ConcurrentMap<String/* topic@group */,
ConcurrentMap<Integer, Long>> pullOffsetTable =
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
index 53e9e2e063..a565ad07c3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
@@ -20,7 +20,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import com.google.common.base.Strings;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.MixAll;
@@ -132,4 +134,30 @@ public class LmqConsumerOffsetManager extends
ConsumerOffsetManager {
}
}
}
+
+ @Override
+ public void assignResetOffset(String topic, String group, int queueId,
long offset) {
+ if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) ||
queueId < 0 || offset < 0) {
+ LOG.warn("Illegal arguments when assigning reset offset. Topic={},
group={}, queueId={}, offset={}",
+ topic, group, queueId, offset);
+ return;
+ }
+ if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {
+ super.assignResetOffset(topic, group, queueId, offset);
+ return;
+ }
+
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+ if (null == map) {
+ map = new ConcurrentHashMap<>();
+ ConcurrentMap<Integer, Long> previous =
resetOffsetTable.putIfAbsent(key, map);
+ if (null != previous) {
+ map = previous;
+ }
+ }
+ map.put(queueId, offset);
+
+ lmqOffsetTable.computeIfPresent(key, (k, oldValue) -> offset);
+ }
}