This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f32fe78ca0 [ISSUE #9025] [RIP-73] Fix Pop Consumption reset offset
(#9087)
f32fe78ca0 is described below
commit f32fe78ca039fc2fec3341323dc61e8a9e486368
Author: lizhimins <[email protected]>
AuthorDate: Mon Dec 30 20:05:20 2024 +0800
[ISSUE #9025] [RIP-73] Fix Pop Consumption reset offset (#9087)
---
.../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 5856873955..6bcf9aaa0f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2187,9 +2187,13 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
ResetOffsetBody body = new ResetOffsetBody();
String brokerName = brokerController.getBrokerConfig().getBrokerName();
for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
-
brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic,
group, entry.getKey());
+ if (brokerController.getPopInflightMessageCounter() != null) {
+
brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic,
group, entry.getKey());
+ }
if
(brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
- brokerController.getPopConsumerService().clearCache(group,
topic, queueId);
+ brokerController.getPopConsumerService().clearCache(group,
topic, entry.getKey());
+ brokerController.getConsumerOffsetManager().commitPullOffset(
+ "ResetOffsetInner", group, topic, entry.getKey(),
entry.getValue());
}
body.getOffsetTable().put(new MessageQueue(topic, brokerName,
entry.getKey()), entry.getValue());
}