This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 81e3648e3 Introduce retry topic and lag estimation for lag 
calculation. (#5702)
81e3648e3 is described below

commit 81e3648e3fcb1995897db3960a4561d6047b696c
Author: SSpirits <[email protected]>
AuthorDate: Fri Dec 16 16:48:56 2022 +0800

    Introduce retry topic and lag estimation for lag calculation. (#5702)
---
 .../broker/metrics/BrokerMetricsConstant.java      |   2 +-
 .../broker/metrics/BrokerMetricsManager.java       |  25 +--
 .../broker/metrics/ConsumerLagCalculator.java      | 188 ++++++++++++++++-----
 .../processor/AbstractSendMessageProcessor.java    |  15 +-
 .../broker/processor/PopMessageProcessor.java      |  13 ++
 .../broker/processor/SendMessageProcessor.java     |  10 +-
 .../store/metrics/DefaultStoreMetricsConstant.java |   1 -
 7 files changed, 186 insertions(+), 68 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
index c7b6aa8e1..73b40f6ba 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
@@ -36,7 +36,7 @@ public class BrokerMetricsConstant {
     public static final String GAUGE_CONSUMER_INFLIGHT_MESSAGES = 
"rocketmq_consumer_inflight_messages";
     public static final String GAUGE_CONSUMER_QUEUEING_LATENCY = 
"rocketmq_consumer_queueing_latency";
     public static final String GAUGE_CONSUMER_READY_MESSAGES = 
"rocketmq_consumer_ready_messages";
-    public static final String GAUGE_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = 
"rocketmq_send_to_dlq_messages_total";
+    public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = 
"rocketmq_send_to_dlq_messages_total";
 
     public static final String LABEL_CLUSTER_NAME = "cluster";
     public static final String LABEL_NODE_TYPE = "node_type";
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index a1d4f5917..5fb8085a5 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -65,6 +65,7 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
 
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
@@ -76,7 +77,6 @@ import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CON
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES;
-import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
@@ -84,6 +84,7 @@ import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGG
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID;
@@ -104,7 +105,6 @@ public class BrokerMetricsManager {
     private final BrokerController brokerController;
     private final ConsumerLagCalculator consumerLagCalculator;
     private final static Map<String, String> LABEL_MAP = new HashMap<>();
-    private final Map<String, Long> dlqOffsetMap = new HashMap<>();
     private OtlpGrpcMetricExporter metricExporter;
     private PeriodicMetricReader periodicMetricReader;
     private PrometheusHttpServer prometheusHttpServer;
@@ -131,7 +131,7 @@ public class BrokerMetricsManager {
     public static ObservableLongGauge consumerInflightMessages = new 
NopObservableLongGauge();
     public static ObservableLongGauge consumerQueueingLatency = new 
NopObservableLongGauge();
     public static ObservableLongGauge consumerReadyMessages = new 
NopObservableLongGauge();
-    public static ObservableLongGauge sendToDlqMessages = new 
NopObservableLongGauge();
+    public static LongCounter sendToDlqMessages = new NopLongCounter();
 
     public BrokerMetricsManager(BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -151,6 +151,7 @@ public class BrokerMetricsManager {
         AttributesBuilder attributesBuilder = newAttributesBuilder();
         attributesBuilder.put(LABEL_CONSUMER_GROUP, result.group);
         attributesBuilder.put(LABEL_TOPIC, result.topic);
+        attributesBuilder.put(LABEL_IS_RETRY, result.isRetry);
         attributesBuilder.put(LABEL_IS_SYSTEM, isSystem(result.topic, 
result.group));
         return attributesBuilder.build();
     }
@@ -487,23 +488,9 @@ public class BrokerMetricsManager {
             .buildWithCallback(measurement ->
                 consumerLagCalculator.calculateAvailable(result -> 
measurement.record(result.available, buildLagAttributes(result))));
 
-        sendToDlqMessages = 
brokerMeter.gaugeBuilder(GAUGE_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL)
+        sendToDlqMessages = 
brokerMeter.counterBuilder(COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL)
             .setDescription("Consumer send to DLQ messages")
-            .ofLongs()
-            .buildWithCallback(measurement -> 
consumerLagCalculator.calculateSendToDLQ(result -> {
-                long val = result.dlqMessageCount;
-                if (brokerConfig.isMetricsInDelta()) {
-                    String key = result.group + "%%" + result.topic;
-                    Long lastOffset = dlqOffsetMap.computeIfAbsent(key, k -> 
result.dlqMessageCount);
-                    dlqOffsetMap.put(key, result.dlqMessageCount);
-                    val -= lastOffset;
-                }
-                AttributesBuilder attributesBuilder = newAttributesBuilder();
-                attributesBuilder.put(LABEL_CONSUMER_GROUP, result.group);
-                attributesBuilder.put(LABEL_TOPIC, result.topic);
-                attributesBuilder.put(LABEL_IS_SYSTEM, isSystem(result.topic, 
result.group));
-                measurement.record(val, attributesBuilder.build());
-            }));
+            .build();
     }
 
     private void initOtherMetrics() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
index befcf6e53..4b8767de5 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java
@@ -22,22 +22,29 @@ import java.util.function.Consumer;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.processor.PopBufferMergeService;
+import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.DefaultMessageFilter;
 import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 
 public class ConsumerLagCalculator {
     private final BrokerConfig brokerConfig;
@@ -48,6 +55,7 @@ public class ConsumerLagCalculator {
     private final SubscriptionGroupManager subscriptionGroupManager;
     private final MessageStore messageStore;
     private final PopBufferMergeService popBufferMergeService;
+    private final PopInflightMessageCounter popInflightMessageCounter;
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -60,25 +68,33 @@ public class ConsumerLagCalculator {
         this.subscriptionGroupManager = 
brokerController.getSubscriptionGroupManager();
         this.messageStore = brokerController.getMessageStore();
         this.popBufferMergeService = 
brokerController.getPopMessageProcessor().getPopBufferMergeService();
+        this.popInflightMessageCounter = 
brokerController.getPopInflightMessageCounter();
     }
 
     private static class ProcessGroupInfo {
         public String group;
         public String topic;
+        public boolean isPop;
+        public String retryTopic;
 
-        public ProcessGroupInfo(String group, String topic) {
+        public ProcessGroupInfo(String group, String topic, boolean isPop,
+            String retryTopic) {
             this.group = group;
             this.topic = topic;
+            this.isPop = isPop;
+            this.retryTopic = retryTopic;
         }
     }
 
     public static class BaseCalculateResult {
         public String group;
         public String topic;
+        public boolean isRetry;
 
-        public BaseCalculateResult(String group, String topic) {
+        public BaseCalculateResult(String group, String topic, boolean 
isRetry) {
             this.group = group;
             this.topic = topic;
+            this.isRetry = isRetry;
         }
     }
 
@@ -86,8 +102,8 @@ public class ConsumerLagCalculator {
         public long lag;
         public long earliestUnconsumedTimestamp;
 
-        public CalculateLagResult(String group, String topic) {
-            super(group, topic);
+        public CalculateLagResult(String group, String topic, boolean isRetry) 
{
+            super(group, topic, isRetry);
         }
     }
 
@@ -95,16 +111,16 @@ public class ConsumerLagCalculator {
         public long inFlight;
         public long earliestUnPulledTimestamp;
 
-        public CalculateInflightResult(String group, String topic) {
-            super(group, topic);
+        public CalculateInflightResult(String group, String topic, boolean 
isRetry) {
+            super(group, topic, isRetry);
         }
     }
 
     public static class CalculateAvailableResult extends BaseCalculateResult {
         public long available;
 
-        public CalculateAvailableResult(String group, String topic) {
-            super(group, topic);
+        public CalculateAvailableResult(String group, String topic, boolean 
isRetry) {
+            super(group, topic, isRetry);
         }
     }
 
@@ -112,7 +128,7 @@ public class ConsumerLagCalculator {
         public long dlqMessageCount;
 
         public CalculateSendToDLQResult(String group, String topic) {
-            super(group, topic);
+            super(group, topic, false);
         }
     }
 
@@ -122,10 +138,11 @@ public class ConsumerLagCalculator {
             String group = subscriptionEntry.getKey();
             SubscriptionGroupConfig subscriptionGroupConfig = 
subscriptionEntry.getValue();
 
-            ConsumerGroupInfo consumerGroupInfo = 
consumerManager.getConsumerGroupInfo(group);
+            ConsumerGroupInfo consumerGroupInfo = 
consumerManager.getConsumerGroupInfo(group, true);
             if (consumerGroupInfo == null) {
                 continue;
             }
+            boolean isPop = consumerGroupInfo.getConsumeType() == 
ConsumeType.CONSUME_POP;
             Set<String> topics = consumerGroupInfo.getSubscribeTopics();
             if (null == topics || topics.isEmpty()) {
                 continue;
@@ -148,73 +165,94 @@ public class ConsumerLagCalculator {
                     continue;
                 }
 
-                consumer.accept(new ProcessGroupInfo(group, topic));
+                if (isPop) {
+                    String retryTopic = KeyBuilder.buildPopRetryTopic(topic, 
group);
+                    TopicConfig retryTopicConfig = 
topicConfigManager.selectTopicConfig(retryTopic);
+                    int retryTopicPerm = topicConfig.getPerm() & 
brokerConfig.getBrokerPermission();
+                    if (PermName.isReadable(retryTopicPerm) || 
PermName.isWriteable(retryTopicPerm)) {
+                        consumer.accept(new ProcessGroupInfo(group, topic, 
true, retryTopic));
+                    } else {
+                        consumer.accept(new ProcessGroupInfo(group, topic, 
true, null));
+                    }
+                } else {
+                    consumer.accept(new ProcessGroupInfo(group, topic, false, 
null));
+                }
             }
         }
     }
 
     public void calculateLag(Consumer<CalculateLagResult> lagRecorder) {
         processAllGroup(info -> {
-            CalculateLagResult result = new CalculateLagResult(info.group, 
info.topic);
+            CalculateLagResult result = new CalculateLagResult(info.group, 
info.topic, false);
 
-            Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic);
+            Pair<Long, Long> lag = getConsumerLagStats(info.group, info.topic, 
info.isPop);
             if (lag != null) {
                 result.lag = lag.getObject1();
                 result.earliestUnconsumedTimestamp = lag.getObject2();
             }
             lagRecorder.accept(result);
+
+            if (info.isPop) {
+                Pair<Long, Long> retryLag = getConsumerLagStats(info.group, 
info.retryTopic, true);
+
+                result = new CalculateLagResult(info.group, info.topic, true);
+                if (retryLag != null) {
+                    result.lag = retryLag.getObject1();
+                    result.earliestUnconsumedTimestamp = retryLag.getObject2();
+                }
+                lagRecorder.accept(result);
+            }
         });
     }
 
     public void calculateInflight(Consumer<CalculateInflightResult> 
inflightRecorder) {
         processAllGroup(info -> {
-            CalculateInflightResult result = new 
CalculateInflightResult(info.group, info.topic);
-            Pair<Long, Long> inFlight = getInFlightMsgStats(info.group, 
info.topic);
+            CalculateInflightResult result = new 
CalculateInflightResult(info.group, info.topic, false);
+            Pair<Long, Long> inFlight = getInFlightMsgStats(info.group, 
info.topic, info.isPop);
             if (inFlight != null) {
                 result.inFlight = inFlight.getObject1();
                 result.earliestUnPulledTimestamp = inFlight.getObject2();
             }
             inflightRecorder.accept(result);
+
+            if (info.isPop) {
+                Pair<Long, Long> retryInFlight = 
getInFlightMsgStats(info.group, info.retryTopic, true);
+
+                result = new CalculateInflightResult(info.group, info.topic, 
true);
+                if (retryInFlight != null) {
+                    result.inFlight = retryInFlight.getObject1();
+                    result.earliestUnPulledTimestamp = 
retryInFlight.getObject2();
+                }
+                inflightRecorder.accept(result);
+            }
         });
     }
 
     public void calculateAvailable(Consumer<CalculateAvailableResult> 
availableRecorder) {
         processAllGroup(info -> {
-            CalculateAvailableResult result = new 
CalculateAvailableResult(info.group, info.topic);
+            CalculateAvailableResult result = new 
CalculateAvailableResult(info.group, info.topic, false);
 
-            result.available = getAvailableMsgCount(info.group, info.topic);
+            result.available = getAvailableMsgCount(info.group, info.topic, 
info.isPop);
             availableRecorder.accept(result);
-        });
-    }
-
-    public void calculateSendToDLQ(Consumer<CalculateSendToDLQResult> 
dlqRecorder) {
-        processAllGroup(info -> {
-            CalculateSendToDLQResult result = new 
CalculateSendToDLQResult(info.group, info.topic);
 
-            String dlqTopic = MixAll.DLQ_GROUP_TOPIC_PREFIX + info.group;
-            TopicConfig topicConfig = 
topicConfigManager.selectTopicConfig(dlqTopic);
-            if (topicConfig == null) {
-                return;
-            }
+            if (info.isPop) {
+                long retryAvailable = getAvailableMsgCount(info.group, 
info.retryTopic, true);
 
-            ConsumeQueueInterface consumeQueue = 
messageStore.getConsumeQueue(dlqTopic, 0);
-            if (consumeQueue == null) {
-                return;
+                result = new CalculateAvailableResult(info.group, info.topic, 
true);
+                result.available = retryAvailable;
+                availableRecorder.accept(result);
             }
-
-            result.dlqMessageCount = consumeQueue.getMaxOffsetInQueue();
-            dlqRecorder.accept(result);
         });
     }
 
-    public Pair<Long, Long> getConsumerLagStats(String group, String topic) {
+    public Pair<Long, Long> getConsumerLagStats(String group, String topic, 
boolean isPop) {
         long total = 0L;
         long earliestUnconsumedTimestamp = Long.MAX_VALUE;
 
         TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
         if (topicConfig != null) {
             for (int queueId = 0; queueId < topicConfig.getWriteQueueNums(); 
queueId++) {
-                Pair<Long, Long> pair = getConsumerLagStats(group, topic, 
queueId);
+                Pair<Long, Long> pair = getConsumerLagStats(group, topic, 
queueId, isPop);
                 total += pair.getObject1();
                 earliestUnconsumedTimestamp = 
Math.min(earliestUnconsumedTimestamp, pair.getObject2());
             }
@@ -229,12 +267,27 @@ public class ConsumerLagCalculator {
         return new Pair<>(total, earliestUnconsumedTimestamp);
     }
 
-    public Pair<Long, Long> getConsumerLagStats(String group, String topic, 
int queueId) {
+    public Pair<Long, Long> getConsumerLagStats(String group, String topic, 
int queueId, boolean isPop) {
         long brokerOffset = messageStore.getMaxOffsetInQueue(topic, queueId);
         if (brokerOffset < 0) {
             brokerOffset = 0;
         }
 
+        if (isPop) {
+            long pullOffset = popBufferMergeService.getLatestOffset(topic, 
group, queueId);
+            if (pullOffset < 0) {
+                pullOffset = offsetManager.queryOffset(group, topic, queueId);
+            }
+            if (pullOffset < 0) {
+                pullOffset = brokerOffset;
+            }
+            long inFlightNum = 
popInflightMessageCounter.getGroupPopInFlightMessageNum(topic, group, queueId);
+            long lag = calculateMessageCount(group, topic, queueId, 
pullOffset, brokerOffset) + inFlightNum;
+            long consumerOffset = pullOffset - inFlightNum;
+            long consumerStoreTimeStamp = getStoreTimeStamp(topic, queueId, 
consumerOffset);
+            return new Pair<>(lag, consumerStoreTimeStamp);
+        }
+
         long consumerOffset = offsetManager.queryOffset(group, topic, queueId);
         if (consumerOffset < 0) {
             consumerOffset = brokerOffset;
@@ -245,14 +298,14 @@ public class ConsumerLagCalculator {
         return new Pair<>(lag, consumerStoreTimeStamp);
     }
 
-    public Pair<Long, Long> getInFlightMsgStats(String group, String topic) {
+    public Pair<Long, Long> getInFlightMsgStats(String group, String topic, 
boolean isPop) {
         long total = 0L;
         long earliestUnPulledTimestamp = Long.MAX_VALUE;
 
         TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
         if (topicConfig != null) {
             for (int queueId = 0; queueId < topicConfig.getWriteQueueNums(); 
queueId++) {
-                Pair<Long, Long> pair = getInFlightMsgStats(group, topic, 
queueId);
+                Pair<Long, Long> pair = getInFlightMsgStats(group, topic, 
queueId, isPop);
                 total += pair.getObject1();
                 earliestUnPulledTimestamp = 
Math.min(earliestUnPulledTimestamp, pair.getObject2());
             }
@@ -267,7 +320,20 @@ public class ConsumerLagCalculator {
         return new Pair<>(total, earliestUnPulledTimestamp);
     }
 
-    public Pair<Long, Long> getInFlightMsgStats(String group, String topic, 
int queueId) {
+    public Pair<Long, Long> getInFlightMsgStats(String group, String topic, 
int queueId, boolean isPop) {
+        if (isPop) {
+            long inflight = 
popInflightMessageCounter.getGroupPopInFlightMessageNum(topic, group, queueId);
+            long pullOffset = popBufferMergeService.getLatestOffset(topic, 
group, queueId);
+            if (pullOffset < 0) {
+                pullOffset = offsetManager.queryOffset(group, topic, queueId);
+            }
+            if (pullOffset < 0) {
+                pullOffset = messageStore.getMaxOffsetInQueue(topic, queueId);
+            }
+            long pullStoreTimeStamp = getStoreTimeStamp(topic, queueId, 
pullOffset);
+            return new Pair<>(inflight, pullStoreTimeStamp);
+        }
+
         long pullOffset = offsetManager.queryPullOffset(group, topic, queueId);
         if (pullOffset < 0) {
             pullOffset = 0;
@@ -283,13 +349,13 @@ public class ConsumerLagCalculator {
         return new Pair<>(inflight, pullStoreTimeStamp);
     }
 
-    public long getAvailableMsgCount(String group, String topic) {
+    public long getAvailableMsgCount(String group, String topic, boolean 
isPop) {
         long total = 0L;
 
         TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
         if (topicConfig != null) {
             for (int queueId = 0; queueId < topicConfig.getWriteQueueNums(); 
queueId++) {
-                total += getAvailableMsgCount(group, topic, queueId);
+                total += getAvailableMsgCount(group, topic, queueId, isPop);
             }
         } else {
             LOGGER.warn("failed to get config of topic {}", topic);
@@ -298,13 +364,24 @@ public class ConsumerLagCalculator {
         return total;
     }
 
-    public long getAvailableMsgCount(String group, String topic, int queueId) {
+    public long getAvailableMsgCount(String group, String topic, int queueId, 
boolean isPop) {
         long brokerOffset = messageStore.getMaxOffsetInQueue(topic, queueId);
         if (brokerOffset < 0) {
             brokerOffset = 0;
         }
 
-        long pullOffset = offsetManager.queryPullOffset(group, topic, queueId);
+        long pullOffset;
+        if (isPop) {
+            pullOffset = popBufferMergeService.getLatestOffset(topic, group, 
queueId);
+            if (pullOffset < 0) {
+                pullOffset = offsetManager.queryOffset(group, topic, queueId);
+            }
+            if (pullOffset < 0) {
+                pullOffset = brokerOffset;
+            }
+        } else {
+            pullOffset = offsetManager.queryPullOffset(group, topic, queueId);
+        }
         if (pullOffset < 0) {
             pullOffset = brokerOffset;
         }
@@ -323,6 +400,27 @@ public class ConsumerLagCalculator {
 
     public long calculateMessageCount(String group, String topic, int queueId, 
long from, long to) {
         long count = to - from;
+
+        if (brokerConfig.isEstimateAccumulation() && to > from) {
+            SubscriptionData subscriptionData = null;
+            ConsumerGroupInfo consumerGroupInfo = 
consumerManager.getConsumerGroupInfo(group, true);
+            if (consumerGroupInfo != null) {
+                subscriptionData = 
consumerGroupInfo.findSubscriptionData(topic);
+            }
+            if (null != subscriptionData &&
+                
ExpressionType.TAG.equalsIgnoreCase(subscriptionData.getExpressionType()) &&
+                
!SubscriptionData.SUB_ALL.equals(subscriptionData.getSubString())) {
+                count = messageStore.estimateMessageCount(topic, queueId, 
from, to,
+                    new DefaultMessageFilter(subscriptionData));
+            } else if (null != subscriptionData &&
+                
ExpressionType.SQL92.equalsIgnoreCase(subscriptionData.getExpressionType())) {
+                ConsumerFilterData consumerFilterData = 
consumerFilterManager.get(topic, group);
+                count = messageStore.estimateMessageCount(topic, queueId, 
from, to,
+                    new ExpressionMessageFilter(subscriptionData,
+                        consumerFilterData,
+                        consumerFilterManager));
+            }
+        }
         return count < 0 ? 0 : count;
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 9022f66ec..933a8984b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -17,17 +17,19 @@
 package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.api.common.Attributes;
 import java.net.SocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.AbortProcessException;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
+import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
@@ -63,6 +65,10 @@ import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
 public abstract class AbstractSendMessageProcessor implements 
NettyRequestProcessor {
     protected static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     protected static final Logger DLQ_LOG = 
LoggerFactory.getLogger(LoggerName.DLQ_LOGGER_NAME);
@@ -184,6 +190,13 @@ public abstract class AbstractSendMessageProcessor 
implements NettyRequestProces
         if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
             || delayLevel < 0) {
 
+            Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                .put(LABEL_CONSUMER_GROUP, requestHeader.getGroup())
+                .put(LABEL_TOPIC, requestHeader.getOriginTopic())
+                .put(LABEL_IS_SYSTEM, 
BrokerMetricsManager.isSystem(requestHeader.getOriginTopic(), 
requestHeader.getGroup()))
+                .build();
+            BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
+
             isDLQ = true;
             newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
             queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 5bb81df5a..26adbb094 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -369,6 +369,19 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                 response.setRemark("parse the consumer's subscription failed");
                 return response;
             }
+        } else {
+            try {
+                SubscriptionData subscriptionData = 
FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG);
+                
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
+                    requestHeader.getTopic(), subscriptionData);
+
+                String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
+                SubscriptionData retrySubscriptionData = 
FilterAPI.build(retryTopic, "*", ExpressionType.TAG);
+                
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
+                    retryTopic, retrySubscriptionData);
+            } catch (Exception e) {
+                POP_LOGGER.warn("Build default subscription error, group: {}", 
requestHeader.getConsumerGroup());
+            }
         }
 
         int randomQ = random.nextInt(100);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 6a2af0ddb..45517e1bb 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -26,8 +26,8 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
-import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -67,6 +67,7 @@ import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
@@ -185,6 +186,13 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 
: requestHeader.getReconsumeTimes();
             // Using '>' instead of '>=' to compatible with the case that 
reconsumeTimes here are increased by client.
             if (reconsumeTimes > maxReconsumeTimes) {
+                Attributes attributes = 
BrokerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_CONSUMER_GROUP, 
requestHeader.getProducerGroup())
+                    .put(LABEL_TOPIC, requestHeader.getTopic())
+                    .put(LABEL_IS_SYSTEM, 
BrokerMetricsManager.isSystem(requestHeader.getTopic(), 
requestHeader.getProducerGroup()))
+                    .build();
+                BrokerMetricsManager.sendToDlqMessages.add(1, attributes);
+
                 properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
                 newTopic = MixAll.getDLQTopic(groupName);
                 int queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
index bcaf5b01c..b5993222c 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
@@ -26,5 +26,4 @@ public class DefaultStoreMetricsConstant {
     public static final String DEFAULT_STORAGE_TYPE = "local";
     public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
     public static final String DEFAULT_STORAGE_MEDIUM = "disk";
-    public static final String LABEL_TOPIC = "topic";
 }

Reply via email to