This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 82a6a784ee [ISSUE #10423] Optimize 
LiteConsumerLagCalculator.getLagCountTopK by deferring getStoreTimestamp to 
topK results only (#10424)
82a6a784ee is described below

commit 82a6a784ee35b36d94b3ab644bd5394834e2c540
Author: Quan <[email protected]>
AuthorDate: Mon Jun 8 14:15:29 2026 +0800

    [ISSUE #10423] Optimize LiteConsumerLagCalculator.getLagCountTopK by 
deferring getStoreTimestamp to topK results only (#10424)
---
 .../broker/metrics/LiteConsumerLagCalculator.java  | 72 +++++++++++++---------
 .../metrics/LiteConsumerLagCalculatorTest.java     | 15 ++++-
 .../apache/rocketmq/common/lite/LiteLagInfo.java   | 13 ++++
 .../command/lite/GetLiteGroupInfoSubCommand.java   | 19 +++---
 4 files changed, 79 insertions(+), 40 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculator.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculator.java
index abde27670c..7f0dcbed5c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculator.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculator.java
@@ -28,12 +28,12 @@ import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.lite.LiteMetadataUtil;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.entity.TopicGroup;
@@ -212,23 +212,21 @@ public class LiteConsumerLagCalculator {
     ) {
         // Use a min heap to maintain the largest topK lag counts
         PriorityQueue<LiteLagInfo> minHeap = new PriorityQueue<>(topK, 
Comparator.comparingLong(LiteLagInfo::getLagCount));
-        AtomicLong totalLagCount = new AtomicLong(0L);
+        long[] totalLagCount = {0L};
 
         offsetTableForEachByGroup(group, (topicGroup, consumerOffset) -> {
-            String topic = topicGroup.topic;
-
-            long diff = offsetDiff(consumerOffset, topic);
+            String lmqName = topicGroup.topic;
+            long diff = offsetDiff(consumerOffset, lmqName);
             if (diff > 0) {
-                totalLagCount.addAndGet(diff);
-                LiteLagInfo liteLagInfo = new LiteLagInfo();
-                liteLagInfo.setLiteTopic(LiteUtil.getLiteTopic(topic));
-                liteLagInfo.setLagCount(diff);
-                
liteLagInfo.setEarliestUnconsumedTimestamp(getStoreTimestamp(topic, 
consumerOffset));
-
-                if (minHeap.size() < topK) {
-                    minHeap.offer(liteLagInfo);
-                } else if (minHeap.peek() != null && liteLagInfo.getLagCount() 
> minHeap.peek().getLagCount()) {
-                    minHeap.poll();
+                totalLagCount[0] += diff;
+                if (minHeap.size() < topK || minHeap.peek() != null && diff > 
minHeap.peek().getLagCount()) {
+                    LiteLagInfo liteLagInfo = new LiteLagInfo();
+                    liteLagInfo.setLiteTopic(LiteUtil.getLiteTopic(lmqName));
+                    liteLagInfo.setLagCount(diff);
+                    liteLagInfo.setConsumerOffset(consumerOffset);
+                    if (minHeap.size() >= topK) {
+                        minHeap.poll();
+                    }
                     minHeap.offer(liteLagInfo);
                 }
             }
@@ -238,7 +236,17 @@ public class LiteConsumerLagCalculator {
         List<LiteLagInfo> topList = new ArrayList<>(minHeap);
         
topList.sort(Comparator.comparingLong(LiteLagInfo::getLagCount).reversed());
 
-        return Pair.of(topList, totalLagCount.get());
+        // Compute getStoreTimestamp only for topK results (expensive 
operation)
+        String parentTopic = LiteMetadataUtil.getLiteBindTopic(group, 
brokerController);
+        for (LiteLagInfo lagInfo : topList) {
+            long consumerOffset = lagInfo.getConsumerOffset();
+            if (consumerOffset >= 0) {
+                String lmqName = LiteUtil.toLmqName(parentTopic, 
lagInfo.getLiteTopic());
+                
lagInfo.setEarliestUnconsumedTimestamp(getStoreTimestamp(lmqName, 
consumerOffset));
+            }
+        }
+
+        return Pair.of(topList, totalLagCount[0]);
     }
 
     /**
@@ -254,21 +262,25 @@ public class LiteConsumerLagCalculator {
         ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable =
             brokerController.getConsumerOffsetManager().getOffsetTable();
         offsetTable.forEach((topicAtGroup, queueOffset) -> {
-            String[] topicGroup = 
topicAtGroup.split(ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR);
-            if (topicGroup.length == 2) {
-                if (!LiteUtil.isLiteTopicQueue(topicGroup[0])) {
-                    return;
-                }
-                // If group specified, only process the matching group
-                if (StringUtils.isEmpty(group) || group.equals(topicGroup[1])) 
{
-                    TopicGroup tg = new TopicGroup(topicGroup[0], 
topicGroup[1]);
-                    Long consumerOffset = queueOffset.get(0);
-                    if (consumerOffset == null) {
-                        return;
-                    }
-                    consumer.accept(tg, consumerOffset);
-                }
+            int sepIdx = 
topicAtGroup.indexOf(ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR);
+            if (sepIdx <= 0 || sepIdx == topicAtGroup.length() - 1) {
+                return;
+            }
+            // Early check lite prefix before any string allocation
+            if (!LiteUtil.isLiteTopicQueue(topicAtGroup)) {
+                return;
+            }
+            // Only extract group substring when needed for comparison
+            String entryGroup = topicAtGroup.substring(sepIdx + 1);
+            if (StringUtils.isNotEmpty(group) && !group.equals(entryGroup)) {
+                return;
+            }
+            Long consumerOffset = queueOffset.get(0);
+            if (consumerOffset == null) {
+                return;
             }
+            String topic = topicAtGroup.substring(0, sepIdx);
+            consumer.accept(new TopicGroup(topic, entryGroup), consumerOffset);
         });
     }
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java
index 732ca7dfbd..d2d6b7244a 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.entity.TopicGroup;
 import org.apache.rocketmq.common.lite.LiteLagInfo;
 import org.apache.rocketmq.common.lite.LiteUtil;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -55,6 +57,9 @@ public class LiteConsumerLagCalculatorTest {
     @Mock
     private ConsumerOffsetManager consumerOffsetManager;
 
+    @Mock
+    private SubscriptionGroupManager subscriptionGroupManager;
+
     private final BrokerConfig brokerConfig = new BrokerConfig();
 
     @Before
@@ -276,10 +281,16 @@ public class LiteConsumerLagCalculatorTest {
 
         when(consumerOffsetManager.getOffsetTable()).thenReturn(offsetTable);
 
+        // Mock SubscriptionGroupManager to return liteBindTopic
+        
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteBindTopic(topic);
+        
when(subscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+
         // Mock store timestamps
         long timestamp1 = 1000L;
         long timestamp2 = 2000L;
-        long timestamp3 = 1500L;
 
         // Create a spy of the calculator to allow partial mocking
         LiteConsumerLagCalculator spyCalculator = 
spy(liteConsumerLagCalculator);
@@ -287,8 +298,6 @@ public class LiteConsumerLagCalculatorTest {
         // Mock getStoreTimestamp method on the spy
         doReturn(timestamp1).when(spyCalculator).getStoreTimestamp(lmqName1, 
consumerOffset1);
         doReturn(timestamp2).when(spyCalculator).getStoreTimestamp(lmqName2, 
consumerOffset2);
-        doReturn(timestamp3).when(spyCalculator).getStoreTimestamp(lmqName3, 
consumerOffset3);
-
         // Mock getMaxOffset method on the spy
         doReturn(100L).when(spyCalculator).getMaxOffset(lmqName1);
         doReturn(80L).when(spyCalculator).getMaxOffset(lmqName2);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/lite/LiteLagInfo.java 
b/common/src/main/java/org/apache/rocketmq/common/lite/LiteLagInfo.java
index 5a3caf0371..8e74e3e1b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/lite/LiteLagInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/lite/LiteLagInfo.java
@@ -16,12 +16,17 @@
  */
 package org.apache.rocketmq.common.lite;
 
+import com.alibaba.fastjson2.annotation.JSONField;
+
 public class LiteLagInfo {
     private String liteTopic;
     private long lagCount;
     // earliest unconsumed timestamp
     private long earliestUnconsumedTimestamp = -1;
 
+    @JSONField(serialize = false)
+    private long consumerOffset = -1;
+
     public String getLiteTopic() {
         return liteTopic;
     }
@@ -45,4 +50,12 @@ public class LiteLagInfo {
     public void setEarliestUnconsumedTimestamp(long 
earliestUnconsumedTimestamp) {
         this.earliestUnconsumedTimestamp = earliestUnconsumedTimestamp;
     }
+
+    public long getConsumerOffset() {
+        return consumerOffset;
+    }
+
+    public void setConsumerOffset(long consumerOffset) {
+        this.consumerOffset = consumerOffset;
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/lite/GetLiteGroupInfoSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/lite/GetLiteGroupInfoSubCommand.java
index 6fc17dc523..2286e61388 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/lite/GetLiteGroupInfoSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/lite/GetLiteGroupInfoSubCommand.java
@@ -89,7 +89,7 @@ public class GetLiteGroupInfoSubCommand implements SubCommand 
{
             System.out.printf("Lite Group Info: [%s] [%s]%n", group, 
parentTopic);
 
             long totalLagCount = 0;
-            long earliestUnconsumedTimestamp = System.currentTimeMillis();
+            long earliestUnconsumedTimestamp = -1;
             List<LiteLagInfo> lagCountTopK = new ArrayList<>();
             List<LiteLagInfo> lagTimestampTopK = new ArrayList<>();
 
@@ -111,8 +111,9 @@ public class GetLiteGroupInfoSubCommand implements 
SubCommand {
                 try {
                     GetLiteGroupInfoResponseBody body = 
defaultMQAdminExt.getLiteGroupInfo(brokerAddr, group, liteTopic, topK);
                     totalLagCount += body.getTotalLagCount() > 0 ? 
body.getTotalLagCount() : 0;
-                    if (body.getEarliestUnconsumedTimestamp() > 0) {
-                        earliestUnconsumedTimestamp = 
Math.min(earliestUnconsumedTimestamp, body.getEarliestUnconsumedTimestamp());
+                    long ts = body.getEarliestUnconsumedTimestamp();
+                    if (ts > 0 && (earliestUnconsumedTimestamp < 0 || ts < 
earliestUnconsumedTimestamp)) {
+                        earliestUnconsumedTimestamp = ts;
                     }
                     printOffsetWrapper(queryByLiteTopic, 
brokerData.getBrokerName(), body.getLiteTopicOffsetWrapper());
                     lagCountTopK.addAll(body.getLagCountTopK() != null ? 
body.getLagCountTopK() : Collections.emptyList());
@@ -122,17 +123,15 @@ public class GetLiteGroupInfoSubCommand implements 
SubCommand {
                 }
             }
 
-            System.out.printf("Total Lag Count: %d%n", totalLagCount);
-            long lagTime = System.currentTimeMillis() - 
earliestUnconsumedTimestamp;
-            System.out.printf("Min Unconsumed Timestamp: %d (%d s ago)%n%n", 
earliestUnconsumedTimestamp, lagTime / 1000);
-
             if (queryByLiteTopic) {
+                System.out.printf("Total Lag Count: %d%n", totalLagCount);
                 return;
             }
 
             // Sort and print topK lagCountTopK
             lagCountTopK.sort((o1, o2) -> Long.compare(o2.getLagCount(), 
o1.getLagCount()));
             System.out.printf("------TopK by lag count-----%n");
+            System.out.printf("Total Lag Count: %d%n", totalLagCount);
             System.out.printf("%-6s %-40s %-12s %-30s%n", "NO", "Lite Topic", 
"Lag Count", "UnconsumedTimestamp");
             for (int i = 0; i < lagCountTopK.size(); i++) {
                 LiteLagInfo info = lagCountTopK.get(i);
@@ -144,6 +143,12 @@ public class GetLiteGroupInfoSubCommand implements 
SubCommand {
             // Sort and print topK lagTimestampTopK
             
lagTimestampTopK.sort(Comparator.comparingLong(LiteLagInfo::getEarliestUnconsumedTimestamp));
             System.out.printf("%n------TopK by lag time------%n");
+            if (earliestUnconsumedTimestamp > 0) {
+                long lagTime = System.currentTimeMillis() - 
earliestUnconsumedTimestamp;
+                System.out.printf("Min Unconsumed Timestamp: %d (%d s ago)%n", 
earliestUnconsumedTimestamp, lagTime / 1000);
+            } else {
+                System.out.printf("Min Unconsumed Timestamp: -1 (lag time topK 
may not enabled)%n");
+            }
             System.out.printf("%-6s %-40s %-12s %-30s%n", "NO", "Lite Topic", 
"Lag Count", "UnconsumedTimestamp");
             for (int i = 0; i < lagTimestampTopK.size(); i++) {
                 LiteLagInfo info = lagTimestampTopK.get(i);

Reply via email to