This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0176ef4fac [ISSUE #9149]Assign offset in offsetTable even if the
subscription key not exist. (#9150)
0176ef4fac is described below
commit 0176ef4fac80ace7b972088381c80d71f7475a6e
Author: dingshuangxi888 <[email protected]>
AuthorDate: Thu Feb 6 16:48:31 2025 +0800
[ISSUE #9149]Assign offset in offsetTable even if the subscription key not
exist. (#9150)
* Assign offset in offsetTable even if the subscription key not exist.
---
.../broker/offset/ConsumerOffsetManager.java | 20 ++++----------------
1 file changed, 4 insertions(+), 16 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ea46f1d8a1..85bc8e3789 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.offset;
+import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -417,27 +418,14 @@ public class ConsumerOffsetManager extends ConfigManager {
}
String key = topic + TOPIC_GROUP_SEPARATOR + group;
- ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
- if (null == map) {
- map = new ConcurrentHashMap<Integer, Long>();
- ConcurrentMap<Integer, Long> previous =
resetOffsetTable.putIfAbsent(key, map);
- if (null != previous) {
- map = previous;
- }
- }
-
- map.put(queueId, offset);
- LOG.debug("Reset offset OK. Topic={}, group={}, queueId={},
resetOffset={}",
- topic, group, queueId, offset);
+ resetOffsetTable.computeIfAbsent(key, k ->
Maps.newConcurrentMap()).put(queueId, offset);
+ LOG.debug("Reset offset OK. Topic={}, group={}, queueId={},
resetOffset={}", topic, group, queueId, offset);
// Two things are important here:
// 1, currentOffsetMap might be null if there is no previous records;
// 2, Our overriding here may get overridden by the client instantly
in concurrent cases; But it still makes
// sense in cases like clients are offline.
- ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key);
- if (null != currentOffsetMap) {
- currentOffsetMap.put(queueId, offset);
- }
+ offsetTable.computeIfAbsent(key, k ->
Maps.newConcurrentMap()).put(queueId, offset);
}
public boolean hasOffsetReset(String topic, String group, int queueId) {