This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 82a6a784ee [ISSUE #10423] Optimize
LiteConsumerLagCalculator.getLagCountTopK by deferring getStoreTimestamp to
topK results only (#10424)
82a6a784ee is described below
commit 82a6a784ee35b36d94b3ab644bd5394834e2c540
Author: Quan <[email protected]>
AuthorDate: Mon Jun 8 14:15:29 2026 +0800
[ISSUE #10423] Optimize LiteConsumerLagCalculator.getLagCountTopK by
deferring getStoreTimestamp to topK results only (#10424)
---
.../broker/metrics/LiteConsumerLagCalculator.java | 72 +++++++++++++---------
.../metrics/LiteConsumerLagCalculatorTest.java | 15 ++++-
.../apache/rocketmq/common/lite/LiteLagInfo.java | 13 ++++
.../command/lite/GetLiteGroupInfoSubCommand.java | 19 +++---
4 files changed, 79 insertions(+), 40 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculator.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculator.java
index abde27670c..7f0dcbed5c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculator.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculator.java
@@ -28,12 +28,12 @@ import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.lite.LiteMetadataUtil;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.entity.TopicGroup;
@@ -212,23 +212,21 @@ public class LiteConsumerLagCalculator {
) {
// Use a min heap to maintain the largest topK lag counts
PriorityQueue<LiteLagInfo> minHeap = new PriorityQueue<>(topK,
Comparator.comparingLong(LiteLagInfo::getLagCount));
- AtomicLong totalLagCount = new AtomicLong(0L);
+ long[] totalLagCount = {0L};
offsetTableForEachByGroup(group, (topicGroup, consumerOffset) -> {
- String topic = topicGroup.topic;
-
- long diff = offsetDiff(consumerOffset, topic);
+ String lmqName = topicGroup.topic;
+ long diff = offsetDiff(consumerOffset, lmqName);
if (diff > 0) {
- totalLagCount.addAndGet(diff);
- LiteLagInfo liteLagInfo = new LiteLagInfo();
- liteLagInfo.setLiteTopic(LiteUtil.getLiteTopic(topic));
- liteLagInfo.setLagCount(diff);
-
liteLagInfo.setEarliestUnconsumedTimestamp(getStoreTimestamp(topic,
consumerOffset));
-
- if (minHeap.size() < topK) {
- minHeap.offer(liteLagInfo);
- } else if (minHeap.peek() != null && liteLagInfo.getLagCount()
> minHeap.peek().getLagCount()) {
- minHeap.poll();
+ totalLagCount[0] += diff;
+ if (minHeap.size() < topK || minHeap.peek() != null && diff >
minHeap.peek().getLagCount()) {
+ LiteLagInfo liteLagInfo = new LiteLagInfo();
+ liteLagInfo.setLiteTopic(LiteUtil.getLiteTopic(lmqName));
+ liteLagInfo.setLagCount(diff);
+ liteLagInfo.setConsumerOffset(consumerOffset);
+ if (minHeap.size() >= topK) {
+ minHeap.poll();
+ }
minHeap.offer(liteLagInfo);
}
}
@@ -238,7 +236,17 @@ public class LiteConsumerLagCalculator {
List<LiteLagInfo> topList = new ArrayList<>(minHeap);
topList.sort(Comparator.comparingLong(LiteLagInfo::getLagCount).reversed());
- return Pair.of(topList, totalLagCount.get());
+ // Compute getStoreTimestamp only for topK results (expensive
operation)
+ String parentTopic = LiteMetadataUtil.getLiteBindTopic(group,
brokerController);
+ for (LiteLagInfo lagInfo : topList) {
+ long consumerOffset = lagInfo.getConsumerOffset();
+ if (consumerOffset >= 0) {
+ String lmqName = LiteUtil.toLmqName(parentTopic,
lagInfo.getLiteTopic());
+
lagInfo.setEarliestUnconsumedTimestamp(getStoreTimestamp(lmqName,
consumerOffset));
+ }
+ }
+
+ return Pair.of(topList, totalLagCount[0]);
}
/**
@@ -254,21 +262,25 @@ public class LiteConsumerLagCalculator {
ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable =
brokerController.getConsumerOffsetManager().getOffsetTable();
offsetTable.forEach((topicAtGroup, queueOffset) -> {
- String[] topicGroup =
topicAtGroup.split(ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR);
- if (topicGroup.length == 2) {
- if (!LiteUtil.isLiteTopicQueue(topicGroup[0])) {
- return;
- }
- // If group specified, only process the matching group
- if (StringUtils.isEmpty(group) || group.equals(topicGroup[1]))
{
- TopicGroup tg = new TopicGroup(topicGroup[0],
topicGroup[1]);
- Long consumerOffset = queueOffset.get(0);
- if (consumerOffset == null) {
- return;
- }
- consumer.accept(tg, consumerOffset);
- }
+ int sepIdx =
topicAtGroup.indexOf(ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR);
+ if (sepIdx <= 0 || sepIdx == topicAtGroup.length() - 1) {
+ return;
+ }
+ // Early check lite prefix before any string allocation
+ if (!LiteUtil.isLiteTopicQueue(topicAtGroup)) {
+ return;
+ }
+ // Only extract group substring when needed for comparison
+ String entryGroup = topicAtGroup.substring(sepIdx + 1);
+ if (StringUtils.isNotEmpty(group) && !group.equals(entryGroup)) {
+ return;
+ }
+ Long consumerOffset = queueOffset.get(0);
+ if (consumerOffset == null) {
+ return;
}
+ String topic = topicAtGroup.substring(0, sepIdx);
+ consumer.accept(new TopicGroup(topic, entryGroup), consumerOffset);
});
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java
index 732ca7dfbd..d2d6b7244a 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.entity.TopicGroup;
import org.apache.rocketmq.common.lite.LiteLagInfo;
import org.apache.rocketmq.common.lite.LiteUtil;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -55,6 +57,9 @@ public class LiteConsumerLagCalculatorTest {
@Mock
private ConsumerOffsetManager consumerOffsetManager;
+ @Mock
+ private SubscriptionGroupManager subscriptionGroupManager;
+
private final BrokerConfig brokerConfig = new BrokerConfig();
@Before
@@ -276,10 +281,16 @@ public class LiteConsumerLagCalculatorTest {
when(consumerOffsetManager.getOffsetTable()).thenReturn(offsetTable);
+ // Mock SubscriptionGroupManager to return liteBindTopic
+
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteBindTopic(topic);
+
when(subscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+
// Mock store timestamps
long timestamp1 = 1000L;
long timestamp2 = 2000L;
- long timestamp3 = 1500L;
// Create a spy of the calculator to allow partial mocking
LiteConsumerLagCalculator spyCalculator =
spy(liteConsumerLagCalculator);
@@ -287,8 +298,6 @@ public class LiteConsumerLagCalculatorTest {
// Mock getStoreTimestamp method on the spy
doReturn(timestamp1).when(spyCalculator).getStoreTimestamp(lmqName1,
consumerOffset1);
doReturn(timestamp2).when(spyCalculator).getStoreTimestamp(lmqName2,
consumerOffset2);
- doReturn(timestamp3).when(spyCalculator).getStoreTimestamp(lmqName3,
consumerOffset3);
-
// Mock getMaxOffset method on the spy
doReturn(100L).when(spyCalculator).getMaxOffset(lmqName1);
doReturn(80L).when(spyCalculator).getMaxOffset(lmqName2);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/lite/LiteLagInfo.java
b/common/src/main/java/org/apache/rocketmq/common/lite/LiteLagInfo.java
index 5a3caf0371..8e74e3e1b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/lite/LiteLagInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/lite/LiteLagInfo.java
@@ -16,12 +16,17 @@
*/
package org.apache.rocketmq.common.lite;
+import com.alibaba.fastjson2.annotation.JSONField;
+
public class LiteLagInfo {
private String liteTopic;
private long lagCount;
// earliest unconsumed timestamp
private long earliestUnconsumedTimestamp = -1;
+ @JSONField(serialize = false)
+ private long consumerOffset = -1;
+
public String getLiteTopic() {
return liteTopic;
}
@@ -45,4 +50,12 @@ public class LiteLagInfo {
public void setEarliestUnconsumedTimestamp(long
earliestUnconsumedTimestamp) {
this.earliestUnconsumedTimestamp = earliestUnconsumedTimestamp;
}
+
+ public long getConsumerOffset() {
+ return consumerOffset;
+ }
+
+ public void setConsumerOffset(long consumerOffset) {
+ this.consumerOffset = consumerOffset;
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/lite/GetLiteGroupInfoSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/lite/GetLiteGroupInfoSubCommand.java
index 6fc17dc523..2286e61388 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/lite/GetLiteGroupInfoSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/lite/GetLiteGroupInfoSubCommand.java
@@ -89,7 +89,7 @@ public class GetLiteGroupInfoSubCommand implements SubCommand
{
System.out.printf("Lite Group Info: [%s] [%s]%n", group,
parentTopic);
long totalLagCount = 0;
- long earliestUnconsumedTimestamp = System.currentTimeMillis();
+ long earliestUnconsumedTimestamp = -1;
List<LiteLagInfo> lagCountTopK = new ArrayList<>();
List<LiteLagInfo> lagTimestampTopK = new ArrayList<>();
@@ -111,8 +111,9 @@ public class GetLiteGroupInfoSubCommand implements
SubCommand {
try {
GetLiteGroupInfoResponseBody body =
defaultMQAdminExt.getLiteGroupInfo(brokerAddr, group, liteTopic, topK);
totalLagCount += body.getTotalLagCount() > 0 ?
body.getTotalLagCount() : 0;
- if (body.getEarliestUnconsumedTimestamp() > 0) {
- earliestUnconsumedTimestamp =
Math.min(earliestUnconsumedTimestamp, body.getEarliestUnconsumedTimestamp());
+ long ts = body.getEarliestUnconsumedTimestamp();
+ if (ts > 0 && (earliestUnconsumedTimestamp < 0 || ts <
earliestUnconsumedTimestamp)) {
+ earliestUnconsumedTimestamp = ts;
}
printOffsetWrapper(queryByLiteTopic,
brokerData.getBrokerName(), body.getLiteTopicOffsetWrapper());
lagCountTopK.addAll(body.getLagCountTopK() != null ?
body.getLagCountTopK() : Collections.emptyList());
@@ -122,17 +123,15 @@ public class GetLiteGroupInfoSubCommand implements
SubCommand {
}
}
- System.out.printf("Total Lag Count: %d%n", totalLagCount);
- long lagTime = System.currentTimeMillis() -
earliestUnconsumedTimestamp;
- System.out.printf("Min Unconsumed Timestamp: %d (%d s ago)%n%n",
earliestUnconsumedTimestamp, lagTime / 1000);
-
if (queryByLiteTopic) {
+ System.out.printf("Total Lag Count: %d%n", totalLagCount);
return;
}
// Sort and print topK lagCountTopK
lagCountTopK.sort((o1, o2) -> Long.compare(o2.getLagCount(),
o1.getLagCount()));
System.out.printf("------TopK by lag count-----%n");
+ System.out.printf("Total Lag Count: %d%n", totalLagCount);
System.out.printf("%-6s %-40s %-12s %-30s%n", "NO", "Lite Topic",
"Lag Count", "UnconsumedTimestamp");
for (int i = 0; i < lagCountTopK.size(); i++) {
LiteLagInfo info = lagCountTopK.get(i);
@@ -144,6 +143,12 @@ public class GetLiteGroupInfoSubCommand implements
SubCommand {
// Sort and print topK lagTimestampTopK
lagTimestampTopK.sort(Comparator.comparingLong(LiteLagInfo::getEarliestUnconsumedTimestamp));
System.out.printf("%n------TopK by lag time------%n");
+ if (earliestUnconsumedTimestamp > 0) {
+ long lagTime = System.currentTimeMillis() -
earliestUnconsumedTimestamp;
+ System.out.printf("Min Unconsumed Timestamp: %d (%d s ago)%n",
earliestUnconsumedTimestamp, lagTime / 1000);
+ } else {
+ System.out.printf("Min Unconsumed Timestamp: -1 (lag time topK
may not enabled)%n");
+ }
System.out.printf("%-6s %-40s %-12s %-30s%n", "NO", "Lite Topic",
"Lag Count", "UnconsumedTimestamp");
for (int i = 0; i < lagTimestampTopK.size(); i++) {
LiteLagInfo info = lagTimestampTopK.get(i);