This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9a5559240 [ISSUE #5994] [RIP-46] add pop and timer metrics (#5995)
9a5559240 is described below

commit 9a55592409fc7105c482bf36bb261e8cfb216c5a
Author: SSpirits <[email protected]>
AuthorDate: Wed Feb 8 15:03:08 2023 +0800

    [ISSUE #5994] [RIP-46] add pop and timer metrics (#5995)
    
    * add pop and timer metrics
    * fix according to review comment
---
 .../broker/metrics/BrokerMetricsManager.java       |   5 +
 .../broker/metrics/PopMetricsConstant.java         |  33 ++++
 .../rocketmq/broker/metrics/PopMetricsManager.java | 212 +++++++++++++++++++++
 .../broker/metrics/PopReviveMessageType.java       |  15 +-
 .../broker/processor/AckMessageProcessor.java      |   8 +-
 .../broker/processor/AdminBrokerProcessor.java     |  24 +--
 .../processor/ChangeInvisibleTimeProcessor.java    |  14 +-
 .../broker/processor/PopBufferMergeService.java    |  16 +-
 .../broker/processor/PopReviveService.java         |  39 +++-
 .../broker/processor/PopReviveServiceTest.java     |  22 ++-
 .../store/metrics/DefaultStoreMetricsConstant.java |  10 +
 .../store/metrics/DefaultStoreMetricsManager.java  |  86 +++++++++
 .../rocketmq/store/timer/TimerMessageStore.java    |  91 +++++----
 .../rocketmq/tieredstore/TieredDispatcher.java     |  11 +-
 .../metrics/TieredStoreMetricsConstant.java        |   2 +-
 .../metrics/TieredStoreMetricsManager.java         |  12 +-
 16 files changed, 509 insertions(+), 91 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 9fffb1eda..060b051ff 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -336,6 +336,10 @@ public class BrokerMetricsManager {
         for (Pair<InstrumentSelector, View> selectorViewPair : 
messageStore.getMetricsView()) {
             providerBuilder.registerView(selectorViewPair.getObject1(), 
selectorViewPair.getObject2());
         }
+
+        for (Pair<InstrumentSelector, View> selectorViewPair : 
PopMetricsManager.getMetricsView()) {
+            providerBuilder.registerView(selectorViewPair.getObject1(), 
selectorViewPair.getObject2());
+        }
     }
 
     private void initStatsMetrics() {
@@ -494,6 +498,7 @@ public class BrokerMetricsManager {
     private void initOtherMetrics() {
         RemotingMetricsManager.initMetrics(brokerMeter, 
BrokerMetricsManager::newAttributesBuilder);
         messageStore.initMetrics(brokerMeter, 
BrokerMetricsManager::newAttributesBuilder);
+        PopMetricsManager.initMetrics(brokerMeter, brokerController, 
BrokerMetricsManager::newAttributesBuilder);
     }
 
     public void shutdown() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java
new file mode 100644
index 000000000..41917ed50
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.metrics;
+
+public class PopMetricsConstant {
+    public static final String HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME = 
"rocketmq_pop_buffer_scan_time_consume";
+    public static final String COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL = 
"rocketmq_pop_revive_in_message_total";
+    public static final String COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL = 
"rocketmq_pop_revive_out_message_total";
+    public static final String COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL = 
"rocketmq_pop_revive_retry_messages_total";
+
+    public static final String GAUGE_POP_REVIVE_LAG = 
"rocketmq_pop_revive_lag";
+    public static final String GAUGE_POP_REVIVE_LATENCY = 
"rocketmq_pop_revive_latency";
+    public static final String GAUGE_POP_OFFSET_BUFFER_SIZE = 
"rocketmq_pop_offset_buffer_size";
+    public static final String GAUGE_POP_CHECKPOINT_BUFFER_SIZE = 
"rocketmq_pop_checkpoint_buffer_size";
+
+    public static final String LABEL_REVIVE_MESSAGE_TYPE = 
"revive_message_type";
+    public static final String LABEL_PUT_STATUS = "put_status";
+    public static final String LABEL_QUEUE_ID = "queue_id";
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
new file mode 100644
index 000000000..463371d7e
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.metrics;
+
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.processor.PopBufferMergeService;
+import org.apache.rocketmq.broker.processor.PopReviveService;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
+
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_CHECKPOINT_BUFFER_SIZE;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_OFFSET_BUFFER_SIZE;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_REVIVE_LAG;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_REVIVE_LATENCY;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_PUT_STATUS;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_QUEUE_ID;
+import static 
org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_REVIVE_MESSAGE_TYPE;
+
+public class PopMetricsManager {
+    public static Supplier<AttributesBuilder> attributesBuilderSupplier;
+
+    private static LongHistogram popBufferScanTimeConsume = new 
NopLongHistogram();
+    private static LongCounter popRevivePutTotal = new NopLongCounter();
+    private static LongCounter popReviveGetTotal = new NopLongCounter();
+    private static LongCounter popReviveRetryMessageTotal = new 
NopLongCounter();
+
+    public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+        List<Double> rpcCostTimeBuckets = Arrays.asList(
+            (double) Duration.ofMillis(1).toMillis(),
+            (double) Duration.ofMillis(10).toMillis(),
+            (double) Duration.ofMillis(100).toMillis(),
+            (double) Duration.ofSeconds(1).toMillis(),
+            (double) Duration.ofSeconds(2).toMillis(),
+            (double) Duration.ofSeconds(3).toMillis()
+        );
+        InstrumentSelector popBufferScanTimeConsumeSelector = 
InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
+            .build();
+        View popBufferScanTimeConsumeView = View.builder()
+            
.setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
+            .build();
+        return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, 
popBufferScanTimeConsumeView));
+    }
+
+    public static void initMetrics(Meter meter, BrokerController 
brokerController,
+        Supplier<AttributesBuilder> attributesBuilderSupplier) {
+        PopMetricsManager.attributesBuilderSupplier = 
attributesBuilderSupplier;
+
+        popBufferScanTimeConsume = 
meter.histogramBuilder(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
+            .setDescription("Time consuming of pop buffer scan")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .build();
+        popRevivePutTotal = 
meter.counterBuilder(COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL)
+            .setDescription("Total number of put message to revive topic")
+            .build();
+        popReviveGetTotal = 
meter.counterBuilder(COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL)
+            .setDescription("Total number of get message from revive topic")
+            .build();
+        popReviveRetryMessageTotal = 
meter.counterBuilder(COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL)
+            .setDescription("Total number of put message to pop retry topic")
+            .build();
+
+        meter.gaugeBuilder(GAUGE_POP_OFFSET_BUFFER_SIZE)
+            .setDescription("Time number of buffered offset")
+            .ofLongs()
+            .buildWithCallback(measurement -> 
calculatePopBufferOffsetSize(brokerController, measurement));
+        meter.gaugeBuilder(GAUGE_POP_CHECKPOINT_BUFFER_SIZE)
+            .setDescription("The number of buffered checkpoint")
+            .ofLongs()
+            .buildWithCallback(measurement -> 
calculatePopBufferCkSize(brokerController, measurement));
+        meter.gaugeBuilder(GAUGE_POP_REVIVE_LAG)
+            .setDescription("The processing lag of revive topic")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .buildWithCallback(measurement -> 
calculatePopReviveLag(brokerController, measurement));
+        meter.gaugeBuilder(GAUGE_POP_REVIVE_LATENCY)
+            .setDescription("The processing latency of revive topic")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .buildWithCallback(measurement -> 
calculatePopReviveLatency(brokerController, measurement));
+    }
+
+    private static void calculatePopBufferOffsetSize(BrokerController 
brokerController,
+        ObservableLongMeasurement measurement) {
+        PopBufferMergeService popBufferMergeService = 
brokerController.getPopMessageProcessor().getPopBufferMergeService();
+        measurement.record(popBufferMergeService.getOffsetTotalSize(), 
newAttributesBuilder().build());
+    }
+
+    private static void calculatePopBufferCkSize(BrokerController 
brokerController,
+        ObservableLongMeasurement measurement) {
+        PopBufferMergeService popBufferMergeService = 
brokerController.getPopMessageProcessor().getPopBufferMergeService();
+        measurement.record(popBufferMergeService.getBufferedCKSize(), 
newAttributesBuilder().build());
+    }
+
+    private static void calculatePopReviveLatency(BrokerController 
brokerController,
+        ObservableLongMeasurement measurement) {
+        PopReviveService[] popReviveServices = 
brokerController.getAckMessageProcessor().getPopReviveServices();
+        for (PopReviveService popReviveService : popReviveServices) {
+            measurement.record(popReviveService.getReviveBehindMillis(), 
newAttributesBuilder()
+                .put(LABEL_QUEUE_ID, popReviveService.getQueueId())
+                .build());
+        }
+    }
+
+    private static void calculatePopReviveLag(BrokerController 
brokerController,
+        ObservableLongMeasurement measurement) {
+        PopReviveService[] popReviveServices = 
brokerController.getAckMessageProcessor().getPopReviveServices();
+        for (PopReviveService popReviveService : popReviveServices) {
+            measurement.record(popReviveService.getReviveBehindMessages(), 
newAttributesBuilder()
+                .put(LABEL_QUEUE_ID, popReviveService.getQueueId())
+                .build());
+        }
+    }
+
+    public static void incPopReviveAckPutCount(AckMsg ackMsg, PutMessageStatus 
status) {
+        incPopRevivePutCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(), 
PopReviveMessageType.ACK, status, 1);
+    }
+
+    public static void incPopReviveCkPutCount(PopCheckPoint checkPoint, 
PutMessageStatus status) {
+        incPopRevivePutCount(checkPoint.getCId(), checkPoint.getTopic(), 
PopReviveMessageType.CK, status, 1);
+    }
+
+    public static void incPopRevivePutCount(String group, String topic, 
PopReviveMessageType messageType,
+        PutMessageStatus status, int num) {
+        Attributes attributes = newAttributesBuilder()
+            .put(LABEL_CONSUMER_GROUP, group)
+            .put(LABEL_TOPIC, topic)
+            .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
+            .put(LABEL_PUT_STATUS, status.name())
+            .build();
+        popRevivePutTotal.add(num, attributes);
+    }
+
+    public static void incPopReviveAckGetCount(AckMsg ackMsg, int queueId) {
+        incPopReviveGetCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(), 
PopReviveMessageType.ACK, queueId, 1);
+    }
+
+    public static void incPopReviveCkGetCount(PopCheckPoint checkPoint, int 
queueId) {
+        incPopReviveGetCount(checkPoint.getCId(), checkPoint.getTopic(), 
PopReviveMessageType.CK, queueId, 1);
+    }
+
+    public static void incPopReviveGetCount(String group, String topic, 
PopReviveMessageType messageType, int queueId,
+        int num) {
+        AttributesBuilder builder = newAttributesBuilder();
+        Attributes attributes = builder
+            .put(LABEL_CONSUMER_GROUP, group)
+            .put(LABEL_TOPIC, topic)
+            .put(LABEL_QUEUE_ID, queueId)
+            .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
+            .build();
+        popReviveGetTotal.add(num, attributes);
+    }
+
+    public static void incPopReviveRetryMessageCount(PopCheckPoint checkPoint, 
PutMessageStatus status) {
+        AttributesBuilder builder = newAttributesBuilder();
+        Attributes attributes = builder
+            .put(LABEL_CONSUMER_GROUP, checkPoint.getCId())
+            .put(LABEL_TOPIC, checkPoint.getTopic())
+            .put(LABEL_PUT_STATUS, status.name())
+            .build();
+        popReviveRetryMessageTotal.add(1, attributes);
+    }
+
+    public static void recordPopBufferScanTimeConsume(long time) {
+        popBufferScanTimeConsume.record(time, newAttributesBuilder().build());
+    }
+
+    public static AttributesBuilder newAttributesBuilder() {
+        return attributesBuilderSupplier != null ? 
attributesBuilderSupplier.get() : Attributes.builder();
+    }
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
similarity index 51%
copy from 
store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
copy to 
broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
index b5993222c..3f6fe9c47 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
@@ -14,16 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.metrics;
+package org.apache.rocketmq.broker.metrics;
 
-public class DefaultStoreMetricsConstant {
-    public static final String GAUGE_STORAGE_SIZE = "rocketmq_storage_size";
-    public static final String GAUGE_STORAGE_FLUSH_BEHIND = 
"rocketmq_storage_flush_behind_bytes";
-    public static final String GAUGE_STORAGE_DISPATCH_BEHIND = 
"rocketmq_storage_dispatch_behind_bytes";
-    public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME = 
"rocketmq_storage_message_reserve_time";
-
-    public static final String LABEL_STORAGE_TYPE = "storage_type";
-    public static final String DEFAULT_STORAGE_TYPE = "local";
-    public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
-    public static final String DEFAULT_STORAGE_MEDIUM = "disk";
+public enum PopReviveMessageType {
+    CK,
+    ACK
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 2653de0f5..1985c22d6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicConfig;
@@ -58,6 +59,10 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
         }
     }
 
+    public PopReviveService[] getPopReviveServices() {
+        return popReviveServices;
+    }
+
     public void startPopReviveService() {
         for (PopReviveService popReviveService : popReviveServices) {
             popReviveService.start();
@@ -159,7 +164,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
             }
             try {
                 oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
-                        requestHeader.getTopic(), requestHeader.getQueueId());
+                    requestHeader.getTopic(), requestHeader.getQueueId());
                 if (requestHeader.getOffset() < oldOffset) {
                     return response;
                 }
@@ -216,6 +221,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("put ack msg error:" + putMessageResult);
         }
+        PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
         decInFlightMessageNum(requestHeader);
         return response;
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 78f50c92b..17e1e86c9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1426,10 +1426,11 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx, 
RemotingCommand request) throws RemotingCommandException {
+    private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         final GetAllProducerInfoRequestHeader requestHeader =
-                (GetAllProducerInfoRequestHeader) 
request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
+            (GetAllProducerInfoRequestHeader) 
request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
 
         ProducerTableInfo producerTable = 
this.brokerController.getProducerManager().getProducerTable();
         if (producerTable != null) {
@@ -1443,6 +1444,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         response.setCode(ResponseCode.SYSTEM_ERROR);
         return response;
     }
+
     private RemotingCommand getProducerConnectionList(ChannelHandlerContext 
ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
@@ -1692,13 +1694,13 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     /**
      * Reset consumer offset.
      *
-     * @param topic         Required, not null.
-     * @param group         Required, not null.
-     * @param queueId       if target queue ID is negative, all message queues 
will be reset;
-     *                      otherwise, only the target queue would get reset.
-     * @param timestamp     if timestamp is negative, offset would be reset to 
broker offset at the time being;
-     *                      otherwise, binary search is performed to locate 
target offset.
-     * @param offset        Target offset to reset to if target queue ID is 
properly provided.
+     * @param topic     Required, not null.
+     * @param group     Required, not null.
+     * @param queueId   if target queue ID is negative, all message queues 
will be reset;
+     *                  otherwise, only the target queue would get reset.
+     * @param timestamp if timestamp is negative, offset would be reset to 
broker offset at the time being;
+     *                  otherwise, binary search is performed to locate target 
offset.
+     * @param offset    Target offset to reset to if target queue ID is 
properly provided.
      * @return Affected queues and their new offset
      */
     private RemotingCommand resetOffsetInner(String topic, String group, int 
queueId, long timestamp, Long offset) {
@@ -2260,8 +2262,8 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         runtimeInfo.put("startAcceptSendRequestTimeStamp", 
String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
 
         if 
(this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
-            runtimeInfo.put("timerReadBehind", 
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getReadBehind()));
-            runtimeInfo.put("timerOffsetBehind", 
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getOffsetBehind()));
+            runtimeInfo.put("timerReadBehind", 
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind()));
+            runtimeInfo.put("timerOffsetBehind", 
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehindMessages()));
             runtimeInfo.put("timerCongestNum", 
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getAllCongestNum()));
             runtimeInfo.put("timerEnqueueTps", 
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueTps()));
             runtimeInfo.put("timerDequeueTps", 
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueTps()));
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 91e176f8c..f4a472028 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -127,7 +128,8 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    protected RemotingCommand 
processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader 
requestHeader, String[] extraInfo, RemotingCommand response, 
ChangeInvisibleTimeResponseHeader responseHeader) {
+    protected RemotingCommand 
processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader 
requestHeader,
+        String[] extraInfo, RemotingCommand response, 
ChangeInvisibleTimeResponseHeader responseHeader) {
         long popTime = ExtraInfoUtil.getPopTime(extraInfo);
         long oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
             requestHeader.getTopic(), requestHeader.getQueueId());
@@ -194,6 +196,7 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("change Invisible, put ack msg fail: {}, {}", 
ackMsg, putMessageResult);
         }
+        PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
     }
 
     private PutMessageResult appendCheckPoint(final 
ChangeInvisibleTimeRequestHeader requestHeader, int reviveQid,
@@ -229,9 +232,12 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
                 ck.getReviveTime(), putMessageResult);
         }
 
-        if (putMessageResult != null && putMessageResult.isOk()) {
-            this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
-            
this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(), 1);
+        if (putMessageResult != null) {
+            PopMetricsManager.incPopReviveCkPutCount(ck, 
putMessageResult.getPutMessageStatus());
+            if (putMessageResult.isOk()) {
+                
this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
+                
this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(), 1);
+            }
         }
 
         return putMessageResult;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 4167438e9..e933f5347 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -24,16 +24,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -104,7 +105,7 @@ public class PopBufferMergeService extends ServiceThread {
 
                 this.waitForRunning(interval);
 
-                if (!this.serving && this.buffer.size() == 0 && totalSize() == 
0) {
+                if (!this.serving && this.buffer.size() == 0 && 
getOffsetTotalSize() == 0) {
                     this.serving = true;
                 }
             } catch (Throwable e) {
@@ -121,7 +122,7 @@ public class PopBufferMergeService extends ServiceThread {
         if (!isShouldRunning()) {
             return;
         }
-        while (this.buffer.size() > 0 || totalSize() > 0) {
+        while (this.buffer.size() > 0 || getOffsetTotalSize() > 0) {
             scan();
         }
     }
@@ -304,6 +305,7 @@ public class PopBufferMergeService extends ServiceThread {
                     eclipse, count, countCk, counter.get(), offsetBufferSize);
             }
         }
+        PopMetricsManager.recordPopBufferScanTimeConsume(eclipse);
         scanTimes++;
 
         if (scanTimes >= countOfMinute1) {
@@ -312,7 +314,7 @@ public class PopBufferMergeService extends ServiceThread {
         }
     }
 
-    private int totalSize() {
+    public int getOffsetTotalSize() {
         int count = 0;
         Iterator<Map.Entry<String, QueueWithTime<PopCheckPointWrapper>>> 
iterator = this.commitOffsets.entrySet().iterator();
         while (iterator.hasNext()) {
@@ -323,6 +325,10 @@ public class PopBufferMergeService extends ServiceThread {
         return count;
     }
 
+    public int getBufferedCKSize() {
+        return this.counter.get();
+    }
+
     private void markBitCAS(AtomicInteger setBits, int index) {
         while (true) {
             int bits = setBits.get();
@@ -540,6 +546,7 @@ public class PopBufferMergeService extends ServiceThread {
         }
         MessageExtBrokerInner msgInner = 
popMessageProcessor.buildCkMsg(pointWrapper.getCk(), 
pointWrapper.getReviveQueueId());
         PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        PopMetricsManager.incPopReviveCkPutCount(pointWrapper.getCk(), 
putMessageResult.getPutMessageStatus());
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
@@ -584,6 +591,7 @@ public class PopBufferMergeService extends ServiceThread {
 
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
             && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 95aa52091..fe654fe64 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -66,6 +67,7 @@ public class PopReviveService extends ServiceThread {
     private int queueId;
     private BrokerController brokerController;
     private String reviveTopic;
+    private long currentReviveMessageTimestamp = -1;
     private volatile boolean shouldRunPopRevive = false;
 
     private final NavigableMap<PopCheckPoint/* oldCK */, Pair<Long/* timestamp 
*/, Boolean/* result */>> inflightReviveRequestMap = 
Collections.synchronizedNavigableMap(new TreeMap<>());
@@ -86,6 +88,10 @@ public class PopReviveService extends ServiceThread {
         return "PopReviveService_" + this.queueId;
     }
 
+    public int getQueueId() {
+        return queueId;
+    }
+
     public void setShouldRunPopRevive(final boolean shouldRunPopRevive) {
         this.shouldRunPopRevive = shouldRunPopRevive;
     }
@@ -120,6 +126,7 @@ public class PopReviveService extends ServiceThread {
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
         PutMessageResult putMessageResult = 
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, 
putMessageResult.getPutMessageStatus());
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId 
{}, offset {}, reviveDelay={}, result is {} ",
                 queueId, popCheckPoint, messageExt.getQueueId(), 
messageExt.getQueueOffset(),
@@ -197,11 +204,13 @@ public class PopReviveService extends ServiceThread {
             || pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL && 
offset == pullResult.getMaxOffset();
     }
 
-    private CompletableFuture<Pair<GetMessageStatus, MessageExt>> 
getBizMessage(String topic, long offset, int queueId, String brokerName) {
+    private CompletableFuture<Pair<GetMessageStatus, MessageExt>> 
getBizMessage(String topic, long offset, int queueId,
+        String brokerName) {
         return this.brokerController.getEscapeBridge().getMessageAsync(topic, 
offset, queueId, brokerName, false);
     }
 
-    public PullResult getMessage(String group, String topic, int queueId, long 
offset, int nums, boolean deCompressBody) {
+    public PullResult getMessage(String group, String topic, int queueId, long 
offset, int nums,
+        boolean deCompressBody) {
         GetMessageResult getMessageResult = 
this.brokerController.getMessageStore().getMessage(group, topic, queueId, 
offset, nums, null);
 
         if (getMessageResult != null) {
@@ -315,7 +324,7 @@ public class PopReviveService extends ServiceThread {
             List<MessageExt> messageExts = getReviveMessage(offset, queueId);
             if (messageExts == null || messageExts.isEmpty()) {
                 long old = endTime;
-                long timerDelay = 
brokerController.getMessageStore().getTimerMessageStore().getReadBehind();
+                long timerDelay = 
brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind();
                 long commitLogDelay = 
brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehind();
                 // move endTime
                 if (endTime != 0 && System.currentTimeMillis() - endTime > 3 * 
PopAckConstants.SECOND && timerDelay <= 0 && commitLogDelay <= 0) {
@@ -355,6 +364,7 @@ public class PopReviveService extends ServiceThread {
                         continue;
                     }
                     map.put(point.getTopic() + point.getCId() + 
point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
+                    PopMetricsManager.incPopReviveCkGetCount(point, queueId);
                     point.setReviveOffset(messageExt.getQueueOffset());
                     if (firstRt == 0) {
                         firstRt = point.getReviveTime();
@@ -365,6 +375,7 @@ public class PopReviveService extends ServiceThread {
                         POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, 
raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
                     }
                     AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
+                    PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
                     String mergeKey = ackMsg.getTopic() + 
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + 
ackMsg.getPopTime();
                     PopCheckPoint point = map.get(mergeKey);
                     if (point == null) {
@@ -555,6 +566,25 @@ public class PopReviveService extends ServiceThread {
         brokerController.getMessageStore().putMessage(ckMsg);
     }
 
+    public long getReviveBehindMillis() {
+        if (currentReviveMessageTimestamp <= 0) {
+            return 0;
+        }
+        long maxOffset = 
brokerController.getMessageStore().getMaxOffsetInQueue(reviveTopic, queueId);
+        if (maxOffset - reviveOffset > 1) {
+            return Math.max(0, System.currentTimeMillis() - 
currentReviveMessageTimestamp);
+        }
+        return 0;
+    }
+
+    public long getReviveBehindMessages() {
+        if (currentReviveMessageTimestamp <= 0) {
+            return 0;
+        }
+        long diff = 
brokerController.getMessageStore().getMaxOffsetInQueue(reviveTopic, queueId) - 
reviveOffset;
+        return Math.max(0, diff);
+    }
+
     @Override
     public void run() {
         int slow = 1;
@@ -586,7 +616,10 @@ public class PopReviveService extends ServiceThread {
                 long delay = 0;
                 if (sortList != null && !sortList.isEmpty()) {
                     delay = (System.currentTimeMillis() - 
sortList.get(0).getReviveTime()) / 1000;
+                    currentReviveMessageTimestamp = 
sortList.get(0).getReviveTime();
                     slow = 1;
+                } else {
+                    currentReviveMessageTimestamp = System.currentTimeMillis();
                 }
 
                 POP_LOGGER.info("reviveQueueId={},revive finish,old offset is 
{}, new offset is {}, ckDelay={}  ",
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index 79fe6d587..89ffed7e3 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -90,7 +90,7 @@ public class PopReviveServiceTest {
         
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
         
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
         
when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
-        when(timerMessageStore.getReadBehind()).thenReturn(0L);
+        when(timerMessageStore.getDequeueBehind()).thenReturn(0L);
         when(timerMessageStore.getEnqueueBehind()).thenReturn(0L);
 
         when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(new 
TopicConfig());
@@ -106,7 +106,7 @@ public class PopReviveServiceTest {
         long maxReviveOffset = 4;
 
         when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, 
REVIVE_TOPIC, REVIVE_QUEUE_ID))
-                .thenReturn(0L);
+            .thenReturn(0L);
         List<MessageExt> reviveMessageExtList = new ArrayList<>();
         long basePopTime = System.currentTimeMillis();
         {
@@ -249,14 +249,15 @@ public class PopReviveServiceTest {
         return msgInner;
     }
 
-    public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long 
deliverMs, long reviveOffset, long deliverTime) {
+    public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long 
deliverMs, long reviveOffset,
+        long deliverTime) {
         MessageExtBrokerInner messageExtBrokerInner = buildAckInnerMessage(
-                REVIVE_TOPIC,
-                ackMsg,
-                REVIVE_QUEUE_ID,
-                STORE_HOST,
-                deliverMs,
-                PopMessageProcessor.genAckUniqueId(ackMsg)
+            REVIVE_TOPIC,
+            ackMsg,
+            REVIVE_QUEUE_ID,
+            STORE_HOST,
+            deliverMs,
+            PopMessageProcessor.genAckUniqueId(ackMsg)
         );
         messageExtBrokerInner.setQueueOffset(reviveOffset);
         messageExtBrokerInner.setDeliverTimeMs(deliverMs);
@@ -264,7 +265,8 @@ public class PopReviveServiceTest {
         return messageExtBrokerInner;
     }
 
-    public static MessageExtBrokerInner buildAckInnerMessage(String 
reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs, 
String ackUniqueId) {
+    public static MessageExtBrokerInner buildAckInnerMessage(String 
reviveTopic, AckMsg ackMsg, int reviveQid,
+        SocketAddress host, long deliverMs, String ackUniqueId) {
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         msgInner.setTopic(reviveTopic);
         
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
index b5993222c..271604b1e 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
@@ -22,8 +22,18 @@ public class DefaultStoreMetricsConstant {
     public static final String GAUGE_STORAGE_DISPATCH_BEHIND = 
"rocketmq_storage_dispatch_behind_bytes";
     public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME = 
"rocketmq_storage_message_reserve_time";
 
+    public static final String GAUGE_TIMER_ENQUEUE_LAG = 
"rocketmq_timer_enqueue_lag";
+    public static final String GAUGE_TIMER_ENQUEUE_LATENCY = 
"rocketmq_timer_enqueue_latency";
+    public static final String GAUGE_TIMER_DEQUEUE_LAG = 
"rocketmq_timer_dequeue_lag";
+    public static final String GAUGE_TIMER_DEQUEUE_LATENCY = 
"rocketmq_timer_dequeue_latency";
+    public static final String GAUGE_TIMING_MESSAGES = 
"rocketmq_timing_messages";
+
+    public static final String COUNTER_TIMER_ENQUEUE_TOTAL = 
"rocketmq_timer_enqueue_total";
+    public static final String COUNTER_TIMER_DEQUEUE_TOTAL = 
"rocketmq_timer_dequeue_total";
+
     public static final String LABEL_STORAGE_TYPE = "storage_type";
     public static final String DEFAULT_STORAGE_TYPE = "local";
     public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
     public static final String DEFAULT_STORAGE_MEDIUM = "disk";
+    public static final String LABEL_TOPIC = "topic";
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index 020974a22..686265292 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store.metrics;
 
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.metrics.ObservableLongGauge;
 import io.opentelemetry.sdk.metrics.InstrumentSelector;
@@ -27,18 +28,28 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.Supplier;
 import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
 import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
 
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_DEQUEUE_TOTAL;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_ENQUEUE_TOTAL;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_MEDIUM;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_TYPE;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_DISPATCH_BEHIND;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_FLUSH_BEHIND;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_MESSAGE_RESERVE_TIME;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_SIZE;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LAG;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LATENCY;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LAG;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LATENCY;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMING_MESSAGES;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM;
 import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE;
+import static 
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
 
 public class DefaultStoreMetricsManager {
     public static Supplier<AttributesBuilder> attributesBuilderSupplier;
@@ -49,6 +60,15 @@ public class DefaultStoreMetricsManager {
     public static ObservableLongGauge dispatchBehind = new 
NopObservableLongGauge();
     public static ObservableLongGauge messageReserveTime = new 
NopObservableLongGauge();
 
+    public static ObservableLongGauge timerEnqueueLag = new 
NopObservableLongGauge();
+    public static ObservableLongGauge timerEnqueueLatency = new 
NopObservableLongGauge();
+    public static ObservableLongGauge timerDequeueLag = new 
NopObservableLongGauge();
+    public static ObservableLongGauge timerDequeueLatency = new 
NopObservableLongGauge();
+    public static ObservableLongGauge timingMessages = new 
NopObservableLongGauge();
+
+    public static LongCounter timerDequeueTotal = new NopLongCounter();
+    public static LongCounter timerEnqueueTotal = new NopLongCounter();
+
     public static List<Pair<InstrumentSelector, View>> getMetricsView() {
         return Collections.emptyList();
     }
@@ -95,6 +115,72 @@ public class DefaultStoreMetricsManager {
                 }
                 measurement.record(System.currentTimeMillis() - 
earliestMessageTime, newAttributesBuilder().build());
             });
+
+        timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
+            .setDescription("Timer enqueue messages lag")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                
measurement.record(timerMessageStore.getEnqueueBehindMessages(), 
newAttributesBuilder().build());
+            });
+
+        timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
+            .setDescription("Timer enqueue latency")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                measurement.record(timerMessageStore.getEnqueueBehindMillis(), 
newAttributesBuilder().build());
+            });
+        timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
+            .setDescription("Timer dequeue messages lag")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                
measurement.record(timerMessageStore.getDequeueBehindMessages(), 
newAttributesBuilder().build());
+            });
+        timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
+            .setDescription("Timer dequeue latency")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                measurement.record(timerMessageStore.getDequeueBehind(), 
newAttributesBuilder().build());
+            });
+        timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
+            .setDescription("Current message number in timing")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = 
messageStore.getTimerMessageStore();
+                timerMessageStore.getTimerMetrics()
+                    .getTimingCount()
+                    .forEach((topic, metric) -> {
+                        measurement.record(
+                            metric.getCount().get(),
+                            newAttributesBuilder().put(LABEL_TOPIC, 
topic).build()
+                        );
+                    });
+            });
+        timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
+            .setDescription("Total number of timer dequeue")
+            .build();
+        timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
+            .setDescription("Total number of timer enqueue")
+            .build();
+    }
+
+    public static void incTimerDequeueCount(String topic) {
+        timerDequeueTotal.add(1, newAttributesBuilder()
+            .put(LABEL_TOPIC, topic)
+            .build());
+    }
+
+    public static void incTimerEnqueueCount(String topic) {
+        AttributesBuilder attributesBuilder = newAttributesBuilder();
+        if (topic != null) {
+            attributesBuilder.put(LABEL_TOPIC, topic);
+        }
+        timerEnqueueTotal.add(1, attributesBuilder.build());
     }
 
     public static AttributesBuilder newAttributesBuilder() {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index c4f7e6c77..c6ab81df4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -17,32 +17,6 @@
 package org.apache.rocketmq.store.timer;
 
 import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
-import java.util.function.Function;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.TopicFilterType;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.PutMessageResult;
-import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.store.config.BrokerRole;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -66,6 +40,32 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.apache.rocketmq.store.util.PerfCounter;
 
 public class TimerMessageStore {
@@ -1106,6 +1106,13 @@ public class TimerMessageStore {
         return msgInner;
     }
 
+    private String getRealTopic(MessageExt msgExt) {
+        if (msgExt == null) {
+            return null;
+        }
+        return msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
+    }
+
     private long formatTimeMs(long timeMs) {
         return timeMs / precisionMs * precisionMs;
     }
@@ -1300,6 +1307,7 @@ public class TimerMessageStore {
                             req.setLatch(latch);
                             try {
                                 perfs.startTick("enqueue_put");
+                                
DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
                                 if (shouldRunningDequeue && req.getDelayTime() 
< currWriteTimeMs) {
                                     dequeuePutQueue.put(req);
                                 } else {
@@ -1414,6 +1422,7 @@ public class TimerMessageStore {
                             }
                             try {
                                 perfs.startTick("dequeue_put");
+                                
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg()));
                                 addMetric(tr.getMsg(), -1);
                                 MessageExtBrokerInner msg = 
convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
                                 doRes = PUT_NEED_RETRY != doPut(msg, 
needRoll(tr.getMagic()));
@@ -1600,7 +1609,7 @@ public class TimerMessageStore {
                         TimerMessageStore.LOGGER.info("[{}]Timer 
progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} 
currReadOffset:{} offsetBehind:{} behindMaster:{} " +
                                 "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} 
allCongestNum:{} enqExpiredStoreTime:{}",
                             storeConfig.getBrokerRole(),
-                            format(commitReadTimeMs), format(currReadTimeMs), 
format(currWriteTimeMs), getReadBehind(),
+                            format(commitReadTimeMs), format(currReadTimeMs), 
format(currWriteTimeMs), getDequeueBehind(),
                             tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset, 
timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
                             enqueuePutQueue.size(), dequeueGetQueue.size(), 
dequeuePutQueue.size(), getAllCongestNum(), 
format(lastEnqueueButExpiredStoreTime));
                     }
@@ -1636,22 +1645,34 @@ public class TimerMessageStore {
         return false;
     }
 
-    public long getEnqueueBehind() {
+    public long getEnqueueBehindMessages() {
+        long tmpQueueOffset = currQueueOffset;
+        ConsumeQueue cq = (ConsumeQueue) 
messageStore.getConsumeQueue(TIMER_TOPIC, 0);
+        long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue();
+        return maxOffsetInQueue - tmpQueueOffset;
+    }
+
+    public long getEnqueueBehindMillis() {
         if (System.currentTimeMillis() - lastEnqueueButExpiredTime < 2000) {
             return (System.currentTimeMillis() - 
lastEnqueueButExpiredStoreTime) / 1000;
         }
         return 0;
     }
 
-    public long getReadBehind() {
-        return (System.currentTimeMillis() - currReadTimeMs) / 1000;
+    public long getEnqueueBehind() {
+        return getEnqueueBehindMillis() / 1000;
     }
 
-    public long getOffsetBehind() {
-        long tmpQueueOffset = currQueueOffset;
-        ConsumeQueue cq = (ConsumeQueue) 
messageStore.getConsumeQueue(TIMER_TOPIC, 0);
-        long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue();
-        return maxOffsetInQueue - tmpQueueOffset;
+    public long getDequeueBehindMessages() {
+        return timerWheel.getAllNum(currReadTimeMs);
+    }
+
+    public long getDequeueBehindMillis() {
+        return System.currentTimeMillis() - currReadTimeMs;
+    }
+
+    public long getDequeueBehind() {
+        return getDequeueBehindMillis() / 1000;
     }
 
     public float getEnqueueTps() {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 1d7aeae38..0b1194953 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -166,7 +166,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 if (result == AppendResult.SUCCESS) {
                     Attributes attributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                         .put(TieredStoreMetricsConstant.LABEL_TOPIC, 
request.getTopic())
-                        .put(TieredStoreMetricsConstant.LABEL_QUEUE, 
request.getQueueId())
+                        .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, 
request.getQueueId())
                         .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, 
TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
                         .build();
                     TieredStoreMetricsManager.messagesDispatchTotal.add(1, 
attributes);
@@ -271,7 +271,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             }
             Attributes attributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                 .put(TieredStoreMetricsConstant.LABEL_TOPIC, mq.getTopic())
-                .put(TieredStoreMetricsConstant.LABEL_QUEUE, mq.getQueueId())
+                .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, 
mq.getQueueId())
                 .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, 
TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
                 .build();
             TieredStoreMetricsManager.messagesDispatchTotal.add(queueOffset - 
beforeOffset, attributes);
@@ -290,7 +290,8 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         }
     }
 
-    public void handleAppendCommitLogResult(AppendResult result, 
TieredMessageQueueContainer container, long queueOffset,
+    public void handleAppendCommitLogResult(AppendResult result, 
TieredMessageQueueContainer container,
+        long queueOffset,
         long dispatchOffset, long newCommitLogOffset, int size, long tagCode, 
ByteBuffer message) {
         MessageQueue mq = container.getMessageQueue();
         String topic = mq.getTopic();
@@ -449,7 +450,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         cqMetricsMap.forEach((messageQueue, count) -> {
             Attributes attributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                 .put(TieredStoreMetricsConstant.LABEL_TOPIC, 
messageQueue.getTopic())
-                .put(TieredStoreMetricsConstant.LABEL_QUEUE, 
messageQueue.getQueueId())
+                .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, 
messageQueue.getQueueId())
                 .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, 
TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
                 .build();
             TieredStoreMetricsManager.messagesDispatchTotal.add(count, 
attributes);
@@ -457,7 +458,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         ifMetricsMap.forEach((messageQueue, count) -> {
             Attributes attributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                 .put(TieredStoreMetricsConstant.LABEL_TOPIC, 
messageQueue.getTopic())
-                .put(TieredStoreMetricsConstant.LABEL_QUEUE, 
messageQueue.getQueueId())
+                .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, 
messageQueue.getQueueId())
                 .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, 
TieredFileSegment.FileSegmentType.INDEX.name().toLowerCase())
                 .build();
             TieredStoreMetricsManager.messagesDispatchTotal.add(count, 
attributes);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
index 3029d5dd5..ad7281510 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
@@ -40,7 +40,7 @@ public class TieredStoreMetricsConstant {
 
     public static final String LABEL_TOPIC = "topic";
     public static final String LABEL_GROUP = "group";
-    public static final String LABEL_QUEUE = "queue";
+    public static final String LABEL_QUEUE_ID = "queue_id";
     public static final String LABEL_FILE_TYPE = "file_type";
 
     // blob constants
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 69fe28047..0b0dfd63a 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -70,7 +70,7 @@ import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant
 import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.HISTOGRAM_PROVIDER_RPC_LATENCY;
 import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.HISTOGRAM_UPLOAD_BYTES;
 import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
-import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE;
+import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE_ID;
 import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC;
 import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.STORAGE_MEDIUM_BLOB;
 
@@ -181,13 +181,13 @@ public class TieredStoreMetricsManager {
 
                     Attributes commitLogAttributes = newAttributesBuilder()
                         .put(LABEL_TOPIC, mq.getTopic())
-                        .put(LABEL_QUEUE, mq.getQueueId())
+                        .put(LABEL_QUEUE_ID, mq.getQueueId())
                         .put(LABEL_FILE_TYPE, 
TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
                         .build();
                     measurement.record(Math.max(maxOffset - 
container.getDispatchOffset(), 0), commitLogAttributes);
                     Attributes consumeQueueAttributes = newAttributesBuilder()
                         .put(LABEL_TOPIC, mq.getTopic())
-                        .put(LABEL_QUEUE, mq.getQueueId())
+                        .put(LABEL_QUEUE_ID, mq.getQueueId())
                         .put(LABEL_FILE_TYPE, 
TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
                         .build();
                     measurement.record(Math.max(maxOffset - 
container.getConsumeQueueMaxOffset(), 0), consumeQueueAttributes);
@@ -209,7 +209,7 @@ public class TieredStoreMetricsManager {
 
                     Attributes commitLogAttributes = newAttributesBuilder()
                         .put(LABEL_TOPIC, mq.getTopic())
-                        .put(LABEL_QUEUE, mq.getQueueId())
+                        .put(LABEL_QUEUE_ID, mq.getQueueId())
                         .put(LABEL_FILE_TYPE, 
TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
                         .build();
                     long commitLogDispatchLatency = 
next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), 
container.getDispatchOffset());
@@ -221,7 +221,7 @@ public class TieredStoreMetricsManager {
 
                     Attributes consumeQueueAttributes = newAttributesBuilder()
                         .put(LABEL_TOPIC, mq.getTopic())
-                        .put(LABEL_QUEUE, mq.getQueueId())
+                        .put(LABEL_QUEUE_ID, mq.getQueueId())
                         .put(LABEL_FILE_TYPE, 
TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
                         .build();
                     long consumeQueueDispatchOffset = 
container.getConsumeQueueMaxOffset();
@@ -307,7 +307,7 @@ public class TieredStoreMetricsManager {
                         MessageQueue mq = container.getMessageQueue();
                         Attributes attributes = newAttributesBuilder()
                             .put(LABEL_TOPIC, mq.getTopic())
-                            .put(LABEL_QUEUE, mq.getQueueId())
+                            .put(LABEL_QUEUE_ID, mq.getQueueId())
                             .build();
                         measurement.record(System.currentTimeMillis() - 
timestamp, attributes);
                     }

Reply via email to