This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9a5559240 [ISSUE #5994] [RIP-46] add pop and timer metrics (#5995)
9a5559240 is described below
commit 9a55592409fc7105c482bf36bb261e8cfb216c5a
Author: SSpirits <[email protected]>
AuthorDate: Wed Feb 8 15:03:08 2023 +0800
[ISSUE #5994] [RIP-46] add pop and timer metrics (#5995)
* add pop and timer metrics
* fix according to review comment
---
.../broker/metrics/BrokerMetricsManager.java | 5 +
.../broker/metrics/PopMetricsConstant.java | 33 ++++
.../rocketmq/broker/metrics/PopMetricsManager.java | 212 +++++++++++++++++++++
.../broker/metrics/PopReviveMessageType.java | 15 +-
.../broker/processor/AckMessageProcessor.java | 8 +-
.../broker/processor/AdminBrokerProcessor.java | 24 +--
.../processor/ChangeInvisibleTimeProcessor.java | 14 +-
.../broker/processor/PopBufferMergeService.java | 16 +-
.../broker/processor/PopReviveService.java | 39 +++-
.../broker/processor/PopReviveServiceTest.java | 22 ++-
.../store/metrics/DefaultStoreMetricsConstant.java | 10 +
.../store/metrics/DefaultStoreMetricsManager.java | 86 +++++++++
.../rocketmq/store/timer/TimerMessageStore.java | 91 +++++----
.../rocketmq/tieredstore/TieredDispatcher.java | 11 +-
.../metrics/TieredStoreMetricsConstant.java | 2 +-
.../metrics/TieredStoreMetricsManager.java | 12 +-
16 files changed, 509 insertions(+), 91 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 9fffb1eda..060b051ff 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -336,6 +336,10 @@ public class BrokerMetricsManager {
for (Pair<InstrumentSelector, View> selectorViewPair :
messageStore.getMetricsView()) {
providerBuilder.registerView(selectorViewPair.getObject1(),
selectorViewPair.getObject2());
}
+
+ for (Pair<InstrumentSelector, View> selectorViewPair :
PopMetricsManager.getMetricsView()) {
+ providerBuilder.registerView(selectorViewPair.getObject1(),
selectorViewPair.getObject2());
+ }
}
private void initStatsMetrics() {
@@ -494,6 +498,7 @@ public class BrokerMetricsManager {
private void initOtherMetrics() {
RemotingMetricsManager.initMetrics(brokerMeter,
BrokerMetricsManager::newAttributesBuilder);
messageStore.initMetrics(brokerMeter,
BrokerMetricsManager::newAttributesBuilder);
+ PopMetricsManager.initMetrics(brokerMeter, brokerController,
BrokerMetricsManager::newAttributesBuilder);
}
public void shutdown() {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java
new file mode 100644
index 000000000..41917ed50
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.metrics;
+
+public class PopMetricsConstant {
+ public static final String HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME =
"rocketmq_pop_buffer_scan_time_consume";
+ public static final String COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL =
"rocketmq_pop_revive_in_message_total";
+ public static final String COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL =
"rocketmq_pop_revive_out_message_total";
+ public static final String COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL =
"rocketmq_pop_revive_retry_messages_total";
+
+ public static final String GAUGE_POP_REVIVE_LAG =
"rocketmq_pop_revive_lag";
+ public static final String GAUGE_POP_REVIVE_LATENCY =
"rocketmq_pop_revive_latency";
+ public static final String GAUGE_POP_OFFSET_BUFFER_SIZE =
"rocketmq_pop_offset_buffer_size";
+ public static final String GAUGE_POP_CHECKPOINT_BUFFER_SIZE =
"rocketmq_pop_checkpoint_buffer_size";
+
+ public static final String LABEL_REVIVE_MESSAGE_TYPE =
"revive_message_type";
+ public static final String LABEL_PUT_STATUS = "put_status";
+ public static final String LABEL_QUEUE_ID = "queue_id";
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
new file mode 100644
index 000000000..463371d7e
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.metrics;
+
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.processor.PopBufferMergeService;
+import org.apache.rocketmq.broker.processor.PopReviveService;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
+
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_CHECKPOINT_BUFFER_SIZE;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_OFFSET_BUFFER_SIZE;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_REVIVE_LAG;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_REVIVE_LATENCY;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_PUT_STATUS;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_QUEUE_ID;
+import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_REVIVE_MESSAGE_TYPE;
+
+public class PopMetricsManager {
+ public static Supplier<AttributesBuilder> attributesBuilderSupplier;
+
+ private static LongHistogram popBufferScanTimeConsume = new
NopLongHistogram();
+ private static LongCounter popRevivePutTotal = new NopLongCounter();
+ private static LongCounter popReviveGetTotal = new NopLongCounter();
+ private static LongCounter popReviveRetryMessageTotal = new
NopLongCounter();
+
+ public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+ List<Double> rpcCostTimeBuckets = Arrays.asList(
+ (double) Duration.ofMillis(1).toMillis(),
+ (double) Duration.ofMillis(10).toMillis(),
+ (double) Duration.ofMillis(100).toMillis(),
+ (double) Duration.ofSeconds(1).toMillis(),
+ (double) Duration.ofSeconds(2).toMillis(),
+ (double) Duration.ofSeconds(3).toMillis()
+ );
+ InstrumentSelector popBufferScanTimeConsumeSelector =
InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
+ .build();
+ View popBufferScanTimeConsumeView = View.builder()
+
.setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
+ .build();
+ return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector,
popBufferScanTimeConsumeView));
+ }
+
+ public static void initMetrics(Meter meter, BrokerController
brokerController,
+ Supplier<AttributesBuilder> attributesBuilderSupplier) {
+ PopMetricsManager.attributesBuilderSupplier =
attributesBuilderSupplier;
+
+ popBufferScanTimeConsume =
meter.histogramBuilder(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
+ .setDescription("Time consuming of pop buffer scan")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+ popRevivePutTotal =
meter.counterBuilder(COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL)
+ .setDescription("Total number of put message to revive topic")
+ .build();
+ popReviveGetTotal =
meter.counterBuilder(COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL)
+ .setDescription("Total number of get message from revive topic")
+ .build();
+ popReviveRetryMessageTotal =
meter.counterBuilder(COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL)
+ .setDescription("Total number of put message to pop retry topic")
+ .build();
+
+ meter.gaugeBuilder(GAUGE_POP_OFFSET_BUFFER_SIZE)
+ .setDescription("Time number of buffered offset")
+ .ofLongs()
+ .buildWithCallback(measurement ->
calculatePopBufferOffsetSize(brokerController, measurement));
+ meter.gaugeBuilder(GAUGE_POP_CHECKPOINT_BUFFER_SIZE)
+ .setDescription("The number of buffered checkpoint")
+ .ofLongs()
+ .buildWithCallback(measurement ->
calculatePopBufferCkSize(brokerController, measurement));
+ meter.gaugeBuilder(GAUGE_POP_REVIVE_LAG)
+ .setDescription("The processing lag of revive topic")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .buildWithCallback(measurement ->
calculatePopReviveLag(brokerController, measurement));
+ meter.gaugeBuilder(GAUGE_POP_REVIVE_LATENCY)
+ .setDescription("The processing latency of revive topic")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .buildWithCallback(measurement ->
calculatePopReviveLatency(brokerController, measurement));
+ }
+
+ private static void calculatePopBufferOffsetSize(BrokerController
brokerController,
+ ObservableLongMeasurement measurement) {
+ PopBufferMergeService popBufferMergeService =
brokerController.getPopMessageProcessor().getPopBufferMergeService();
+ measurement.record(popBufferMergeService.getOffsetTotalSize(),
newAttributesBuilder().build());
+ }
+
+ private static void calculatePopBufferCkSize(BrokerController
brokerController,
+ ObservableLongMeasurement measurement) {
+ PopBufferMergeService popBufferMergeService =
brokerController.getPopMessageProcessor().getPopBufferMergeService();
+ measurement.record(popBufferMergeService.getBufferedCKSize(),
newAttributesBuilder().build());
+ }
+
+ private static void calculatePopReviveLatency(BrokerController
brokerController,
+ ObservableLongMeasurement measurement) {
+ PopReviveService[] popReviveServices =
brokerController.getAckMessageProcessor().getPopReviveServices();
+ for (PopReviveService popReviveService : popReviveServices) {
+ measurement.record(popReviveService.getReviveBehindMillis(),
newAttributesBuilder()
+ .put(LABEL_QUEUE_ID, popReviveService.getQueueId())
+ .build());
+ }
+ }
+
+ private static void calculatePopReviveLag(BrokerController
brokerController,
+ ObservableLongMeasurement measurement) {
+ PopReviveService[] popReviveServices =
brokerController.getAckMessageProcessor().getPopReviveServices();
+ for (PopReviveService popReviveService : popReviveServices) {
+ measurement.record(popReviveService.getReviveBehindMessages(),
newAttributesBuilder()
+ .put(LABEL_QUEUE_ID, popReviveService.getQueueId())
+ .build());
+ }
+ }
+
+ public static void incPopReviveAckPutCount(AckMsg ackMsg, PutMessageStatus
status) {
+ incPopRevivePutCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(),
PopReviveMessageType.ACK, status, 1);
+ }
+
+ public static void incPopReviveCkPutCount(PopCheckPoint checkPoint,
PutMessageStatus status) {
+ incPopRevivePutCount(checkPoint.getCId(), checkPoint.getTopic(),
PopReviveMessageType.CK, status, 1);
+ }
+
+ public static void incPopRevivePutCount(String group, String topic,
PopReviveMessageType messageType,
+ PutMessageStatus status, int num) {
+ Attributes attributes = newAttributesBuilder()
+ .put(LABEL_CONSUMER_GROUP, group)
+ .put(LABEL_TOPIC, topic)
+ .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
+ .put(LABEL_PUT_STATUS, status.name())
+ .build();
+ popRevivePutTotal.add(num, attributes);
+ }
+
+ public static void incPopReviveAckGetCount(AckMsg ackMsg, int queueId) {
+ incPopReviveGetCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(),
PopReviveMessageType.ACK, queueId, 1);
+ }
+
+ public static void incPopReviveCkGetCount(PopCheckPoint checkPoint, int
queueId) {
+ incPopReviveGetCount(checkPoint.getCId(), checkPoint.getTopic(),
PopReviveMessageType.CK, queueId, 1);
+ }
+
+ public static void incPopReviveGetCount(String group, String topic,
PopReviveMessageType messageType, int queueId,
+ int num) {
+ AttributesBuilder builder = newAttributesBuilder();
+ Attributes attributes = builder
+ .put(LABEL_CONSUMER_GROUP, group)
+ .put(LABEL_TOPIC, topic)
+ .put(LABEL_QUEUE_ID, queueId)
+ .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
+ .build();
+ popReviveGetTotal.add(num, attributes);
+ }
+
+ public static void incPopReviveRetryMessageCount(PopCheckPoint checkPoint,
PutMessageStatus status) {
+ AttributesBuilder builder = newAttributesBuilder();
+ Attributes attributes = builder
+ .put(LABEL_CONSUMER_GROUP, checkPoint.getCId())
+ .put(LABEL_TOPIC, checkPoint.getTopic())
+ .put(LABEL_PUT_STATUS, status.name())
+ .build();
+ popReviveRetryMessageTotal.add(1, attributes);
+ }
+
+ public static void recordPopBufferScanTimeConsume(long time) {
+ popBufferScanTimeConsume.record(time, newAttributesBuilder().build());
+ }
+
+ public static AttributesBuilder newAttributesBuilder() {
+ return attributesBuilderSupplier != null ?
attributesBuilderSupplier.get() : Attributes.builder();
+ }
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
similarity index 51%
copy from
store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
copy to
broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
index b5993222c..3f6fe9c47 100644
---
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
@@ -14,16 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.store.metrics;
+package org.apache.rocketmq.broker.metrics;
-public class DefaultStoreMetricsConstant {
- public static final String GAUGE_STORAGE_SIZE = "rocketmq_storage_size";
- public static final String GAUGE_STORAGE_FLUSH_BEHIND =
"rocketmq_storage_flush_behind_bytes";
- public static final String GAUGE_STORAGE_DISPATCH_BEHIND =
"rocketmq_storage_dispatch_behind_bytes";
- public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME =
"rocketmq_storage_message_reserve_time";
-
- public static final String LABEL_STORAGE_TYPE = "storage_type";
- public static final String DEFAULT_STORAGE_TYPE = "local";
- public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
- public static final String DEFAULT_STORAGE_MEDIUM = "disk";
+public enum PopReviveMessageType {
+ CK,
+ ACK
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 2653de0f5..1985c22d6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
@@ -58,6 +59,10 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
}
}
+ public PopReviveService[] getPopReviveServices() {
+ return popReviveServices;
+ }
+
public void startPopReviveService() {
for (PopReviveService popReviveService : popReviveServices) {
popReviveService.start();
@@ -159,7 +164,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
}
try {
oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
- requestHeader.getTopic(), requestHeader.getQueueId());
+ requestHeader.getTopic(), requestHeader.getQueueId());
if (requestHeader.getOffset() < oldOffset) {
return response;
}
@@ -216,6 +221,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("put ack msg error:" + putMessageResult);
}
+ PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
decInFlightMessageNum(requestHeader);
return response;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 78f50c92b..17e1e86c9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1426,10 +1426,11 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
- private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
+ private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
final GetAllProducerInfoRequestHeader requestHeader =
- (GetAllProducerInfoRequestHeader)
request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
+ (GetAllProducerInfoRequestHeader)
request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
ProducerTableInfo producerTable =
this.brokerController.getProducerManager().getProducerTable();
if (producerTable != null) {
@@ -1443,6 +1444,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
response.setCode(ResponseCode.SYSTEM_ERROR);
return response;
}
+
private RemotingCommand getProducerConnectionList(ChannelHandlerContext
ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
@@ -1692,13 +1694,13 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
/**
* Reset consumer offset.
*
- * @param topic Required, not null.
- * @param group Required, not null.
- * @param queueId if target queue ID is negative, all message queues
will be reset;
- * otherwise, only the target queue would get reset.
- * @param timestamp if timestamp is negative, offset would be reset to
broker offset at the time being;
- * otherwise, binary search is performed to locate
target offset.
- * @param offset Target offset to reset to if target queue ID is
properly provided.
+ * @param topic Required, not null.
+ * @param group Required, not null.
+ * @param queueId if target queue ID is negative, all message queues
will be reset;
+ * otherwise, only the target queue would get reset.
+ * @param timestamp if timestamp is negative, offset would be reset to
broker offset at the time being;
+ * otherwise, binary search is performed to locate target
offset.
+ * @param offset Target offset to reset to if target queue ID is
properly provided.
* @return Affected queues and their new offset
*/
private RemotingCommand resetOffsetInner(String topic, String group, int
queueId, long timestamp, Long offset) {
@@ -2260,8 +2262,8 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
runtimeInfo.put("startAcceptSendRequestTimeStamp",
String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
if
(this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
- runtimeInfo.put("timerReadBehind",
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getReadBehind()));
- runtimeInfo.put("timerOffsetBehind",
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getOffsetBehind()));
+ runtimeInfo.put("timerReadBehind",
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind()));
+ runtimeInfo.put("timerOffsetBehind",
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehindMessages()));
runtimeInfo.put("timerCongestNum",
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getAllCongestNum()));
runtimeInfo.put("timerEnqueueTps",
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueTps()));
runtimeInfo.put("timerDequeueTps",
String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueTps()));
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 91e176f8c..f4a472028 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -127,7 +128,8 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
return response;
}
- protected RemotingCommand
processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader
requestHeader, String[] extraInfo, RemotingCommand response,
ChangeInvisibleTimeResponseHeader responseHeader) {
+ protected RemotingCommand
processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader
requestHeader,
+ String[] extraInfo, RemotingCommand response,
ChangeInvisibleTimeResponseHeader responseHeader) {
long popTime = ExtraInfoUtil.getPopTime(extraInfo);
long oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId());
@@ -194,6 +196,7 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("change Invisible, put ack msg fail: {}, {}",
ackMsg, putMessageResult);
}
+ PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
}
private PutMessageResult appendCheckPoint(final
ChangeInvisibleTimeRequestHeader requestHeader, int reviveQid,
@@ -229,9 +232,12 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
ck.getReviveTime(), putMessageResult);
}
- if (putMessageResult != null && putMessageResult.isOk()) {
- this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
-
this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), 1);
+ if (putMessageResult != null) {
+ PopMetricsManager.incPopReviveCkPutCount(ck,
putMessageResult.getPutMessageStatus());
+ if (putMessageResult.isOk()) {
+
this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
+
this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), 1);
+ }
}
return putMessageResult;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 4167438e9..e933f5347 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -24,16 +24,17 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
@@ -104,7 +105,7 @@ public class PopBufferMergeService extends ServiceThread {
this.waitForRunning(interval);
- if (!this.serving && this.buffer.size() == 0 && totalSize() ==
0) {
+ if (!this.serving && this.buffer.size() == 0 &&
getOffsetTotalSize() == 0) {
this.serving = true;
}
} catch (Throwable e) {
@@ -121,7 +122,7 @@ public class PopBufferMergeService extends ServiceThread {
if (!isShouldRunning()) {
return;
}
- while (this.buffer.size() > 0 || totalSize() > 0) {
+ while (this.buffer.size() > 0 || getOffsetTotalSize() > 0) {
scan();
}
}
@@ -304,6 +305,7 @@ public class PopBufferMergeService extends ServiceThread {
eclipse, count, countCk, counter.get(), offsetBufferSize);
}
}
+ PopMetricsManager.recordPopBufferScanTimeConsume(eclipse);
scanTimes++;
if (scanTimes >= countOfMinute1) {
@@ -312,7 +314,7 @@ public class PopBufferMergeService extends ServiceThread {
}
}
- private int totalSize() {
+ public int getOffsetTotalSize() {
int count = 0;
Iterator<Map.Entry<String, QueueWithTime<PopCheckPointWrapper>>>
iterator = this.commitOffsets.entrySet().iterator();
while (iterator.hasNext()) {
@@ -323,6 +325,10 @@ public class PopBufferMergeService extends ServiceThread {
return count;
}
+ public int getBufferedCKSize() {
+ return this.counter.get();
+ }
+
private void markBitCAS(AtomicInteger setBits, int index) {
while (true) {
int bits = setBits.get();
@@ -540,6 +546,7 @@ public class PopBufferMergeService extends ServiceThread {
}
MessageExtBrokerInner msgInner =
popMessageProcessor.buildCkMsg(pointWrapper.getCk(),
pointWrapper.getReviveQueueId());
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ PopMetricsManager.incPopReviveCkPutCount(pointWrapper.getCk(),
putMessageResult.getPutMessageStatus());
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
@@ -584,6 +591,7 @@ public class PopBufferMergeService extends ServiceThread {
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 95aa52091..fe654fe64 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.KeyBuilder;
@@ -66,6 +67,7 @@ public class PopReviveService extends ServiceThread {
private int queueId;
private BrokerController brokerController;
private String reviveTopic;
+ private long currentReviveMessageTimestamp = -1;
private volatile boolean shouldRunPopRevive = false;
private final NavigableMap<PopCheckPoint/* oldCK */, Pair<Long/* timestamp
*/, Boolean/* result */>> inflightReviveRequestMap =
Collections.synchronizedNavigableMap(new TreeMap<>());
@@ -86,6 +88,10 @@ public class PopReviveService extends ServiceThread {
return "PopReviveService_" + this.queueId;
}
+ public int getQueueId() {
+ return queueId;
+ }
+
public void setShouldRunPopRevive(final boolean shouldRunPopRevive) {
this.shouldRunPopRevive = shouldRunPopRevive;
}
@@ -120,6 +126,7 @@ public class PopReviveService extends ServiceThread {
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint,
putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId
{}, offset {}, reviveDelay={}, result is {} ",
queueId, popCheckPoint, messageExt.getQueueId(),
messageExt.getQueueOffset(),
@@ -197,11 +204,13 @@ public class PopReviveService extends ServiceThread {
|| pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL &&
offset == pullResult.getMaxOffset();
}
- private CompletableFuture<Pair<GetMessageStatus, MessageExt>>
getBizMessage(String topic, long offset, int queueId, String brokerName) {
+ private CompletableFuture<Pair<GetMessageStatus, MessageExt>>
getBizMessage(String topic, long offset, int queueId,
+ String brokerName) {
return this.brokerController.getEscapeBridge().getMessageAsync(topic,
offset, queueId, brokerName, false);
}
- public PullResult getMessage(String group, String topic, int queueId, long
offset, int nums, boolean deCompressBody) {
+ public PullResult getMessage(String group, String topic, int queueId, long
offset, int nums,
+ boolean deCompressBody) {
GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(group, topic, queueId,
offset, nums, null);
if (getMessageResult != null) {
@@ -315,7 +324,7 @@ public class PopReviveService extends ServiceThread {
List<MessageExt> messageExts = getReviveMessage(offset, queueId);
if (messageExts == null || messageExts.isEmpty()) {
long old = endTime;
- long timerDelay =
brokerController.getMessageStore().getTimerMessageStore().getReadBehind();
+ long timerDelay =
brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind();
long commitLogDelay =
brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehind();
// move endTime
if (endTime != 0 && System.currentTimeMillis() - endTime > 3 *
PopAckConstants.SECOND && timerDelay <= 0 && commitLogDelay <= 0) {
@@ -355,6 +364,7 @@ public class PopReviveService extends ServiceThread {
continue;
}
map.put(point.getTopic() + point.getCId() +
point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
+ PopMetricsManager.incPopReviveCkGetCount(point, queueId);
point.setReviveOffset(messageExt.getQueueOffset());
if (firstRt == 0) {
firstRt = point.getReviveTime();
@@ -365,6 +375,7 @@ public class PopReviveService extends ServiceThread {
POP_LOGGER.info("reviveQueueId={},find ack, offset:{},
raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
+ PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
String mergeKey = ackMsg.getTopic() +
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() +
ackMsg.getPopTime();
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
@@ -555,6 +566,25 @@ public class PopReviveService extends ServiceThread {
brokerController.getMessageStore().putMessage(ckMsg);
}
+ public long getReviveBehindMillis() {
+ if (currentReviveMessageTimestamp <= 0) {
+ return 0;
+ }
+ long maxOffset =
brokerController.getMessageStore().getMaxOffsetInQueue(reviveTopic, queueId);
+ if (maxOffset - reviveOffset > 1) {
+ return Math.max(0, System.currentTimeMillis() -
currentReviveMessageTimestamp);
+ }
+ return 0;
+ }
+
+ public long getReviveBehindMessages() {
+ if (currentReviveMessageTimestamp <= 0) {
+ return 0;
+ }
+ long diff =
brokerController.getMessageStore().getMaxOffsetInQueue(reviveTopic, queueId) -
reviveOffset;
+ return Math.max(0, diff);
+ }
+
@Override
public void run() {
int slow = 1;
@@ -586,7 +616,10 @@ public class PopReviveService extends ServiceThread {
long delay = 0;
if (sortList != null && !sortList.isEmpty()) {
delay = (System.currentTimeMillis() -
sortList.get(0).getReviveTime()) / 1000;
+ currentReviveMessageTimestamp =
sortList.get(0).getReviveTime();
slow = 1;
+ } else {
+ currentReviveMessageTimestamp = System.currentTimeMillis();
}
POP_LOGGER.info("reviveQueueId={},revive finish,old offset is
{}, new offset is {}, ckDelay={} ",
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index 79fe6d587..89ffed7e3 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -90,7 +90,7 @@ public class PopReviveServiceTest {
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
- when(timerMessageStore.getReadBehind()).thenReturn(0L);
+ when(timerMessageStore.getDequeueBehind()).thenReturn(0L);
when(timerMessageStore.getEnqueueBehind()).thenReturn(0L);
when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(new
TopicConfig());
@@ -106,7 +106,7 @@ public class PopReviveServiceTest {
long maxReviveOffset = 4;
when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP,
REVIVE_TOPIC, REVIVE_QUEUE_ID))
- .thenReturn(0L);
+ .thenReturn(0L);
List<MessageExt> reviveMessageExtList = new ArrayList<>();
long basePopTime = System.currentTimeMillis();
{
@@ -249,14 +249,15 @@ public class PopReviveServiceTest {
return msgInner;
}
- public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long
deliverMs, long reviveOffset, long deliverTime) {
+ public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long
deliverMs, long reviveOffset,
+ long deliverTime) {
MessageExtBrokerInner messageExtBrokerInner = buildAckInnerMessage(
- REVIVE_TOPIC,
- ackMsg,
- REVIVE_QUEUE_ID,
- STORE_HOST,
- deliverMs,
- PopMessageProcessor.genAckUniqueId(ackMsg)
+ REVIVE_TOPIC,
+ ackMsg,
+ REVIVE_QUEUE_ID,
+ STORE_HOST,
+ deliverMs,
+ PopMessageProcessor.genAckUniqueId(ackMsg)
);
messageExtBrokerInner.setQueueOffset(reviveOffset);
messageExtBrokerInner.setDeliverTimeMs(deliverMs);
@@ -264,7 +265,8 @@ public class PopReviveServiceTest {
return messageExtBrokerInner;
}
- public static MessageExtBrokerInner buildAckInnerMessage(String
reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs,
String ackUniqueId) {
+ public static MessageExtBrokerInner buildAckInnerMessage(String
reviveTopic, AckMsg ackMsg, int reviveQid,
+ SocketAddress host, long deliverMs, String ackUniqueId) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
index b5993222c..271604b1e 100644
---
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
@@ -22,8 +22,18 @@ public class DefaultStoreMetricsConstant {
public static final String GAUGE_STORAGE_DISPATCH_BEHIND =
"rocketmq_storage_dispatch_behind_bytes";
public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME =
"rocketmq_storage_message_reserve_time";
+ public static final String GAUGE_TIMER_ENQUEUE_LAG =
"rocketmq_timer_enqueue_lag";
+ public static final String GAUGE_TIMER_ENQUEUE_LATENCY =
"rocketmq_timer_enqueue_latency";
+ public static final String GAUGE_TIMER_DEQUEUE_LAG =
"rocketmq_timer_dequeue_lag";
+ public static final String GAUGE_TIMER_DEQUEUE_LATENCY =
"rocketmq_timer_dequeue_latency";
+ public static final String GAUGE_TIMING_MESSAGES =
"rocketmq_timing_messages";
+
+ public static final String COUNTER_TIMER_ENQUEUE_TOTAL =
"rocketmq_timer_enqueue_total";
+ public static final String COUNTER_TIMER_DEQUEUE_TOTAL =
"rocketmq_timer_dequeue_total";
+
public static final String LABEL_STORAGE_TYPE = "storage_type";
public static final String DEFAULT_STORAGE_TYPE = "local";
public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
public static final String DEFAULT_STORAGE_MEDIUM = "disk";
+ public static final String LABEL_TOPIC = "topic";
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index 020974a22..686265292 100644
---
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store.metrics;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
@@ -27,18 +28,28 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_DEQUEUE_TOTAL;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_ENQUEUE_TOTAL;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_MEDIUM;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_TYPE;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_DISPATCH_BEHIND;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_FLUSH_BEHIND;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_MESSAGE_RESERVE_TIME;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_SIZE;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LAG;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LATENCY;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LAG;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LATENCY;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMING_MESSAGES;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
public class DefaultStoreMetricsManager {
public static Supplier<AttributesBuilder> attributesBuilderSupplier;
@@ -49,6 +60,15 @@ public class DefaultStoreMetricsManager {
public static ObservableLongGauge dispatchBehind = new
NopObservableLongGauge();
public static ObservableLongGauge messageReserveTime = new
NopObservableLongGauge();
+ public static ObservableLongGauge timerEnqueueLag = new
NopObservableLongGauge();
+ public static ObservableLongGauge timerEnqueueLatency = new
NopObservableLongGauge();
+ public static ObservableLongGauge timerDequeueLag = new
NopObservableLongGauge();
+ public static ObservableLongGauge timerDequeueLatency = new
NopObservableLongGauge();
+ public static ObservableLongGauge timingMessages = new
NopObservableLongGauge();
+
+ public static LongCounter timerDequeueTotal = new NopLongCounter();
+ public static LongCounter timerEnqueueTotal = new NopLongCounter();
+
public static List<Pair<InstrumentSelector, View>> getMetricsView() {
return Collections.emptyList();
}
@@ -95,6 +115,72 @@ public class DefaultStoreMetricsManager {
}
measurement.record(System.currentTimeMillis() -
earliestMessageTime, newAttributesBuilder().build());
});
+
+ timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
+ .setDescription("Timer enqueue messages lag")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
+
measurement.record(timerMessageStore.getEnqueueBehindMessages(),
newAttributesBuilder().build());
+ });
+
+ timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
+ .setDescription("Timer enqueue latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
+ measurement.record(timerMessageStore.getEnqueueBehindMillis(),
newAttributesBuilder().build());
+ });
+ timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
+ .setDescription("Timer dequeue messages lag")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
+
measurement.record(timerMessageStore.getDequeueBehindMessages(),
newAttributesBuilder().build());
+ });
+ timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
+ .setDescription("Timer dequeue latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
+ measurement.record(timerMessageStore.getDequeueBehind(),
newAttributesBuilder().build());
+ });
+ timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
+ .setDescription("Current message number in timing")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMessageStore timerMessageStore =
messageStore.getTimerMessageStore();
+ timerMessageStore.getTimerMetrics()
+ .getTimingCount()
+ .forEach((topic, metric) -> {
+ measurement.record(
+ metric.getCount().get(),
+ newAttributesBuilder().put(LABEL_TOPIC,
topic).build()
+ );
+ });
+ });
+ timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
+ .setDescription("Total number of timer dequeue")
+ .build();
+ timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
+ .setDescription("Total number of timer enqueue")
+ .build();
+ }
+
+ public static void incTimerDequeueCount(String topic) {
+ timerDequeueTotal.add(1, newAttributesBuilder()
+ .put(LABEL_TOPIC, topic)
+ .build());
+ }
+
+ public static void incTimerEnqueueCount(String topic) {
+ AttributesBuilder attributesBuilder = newAttributesBuilder();
+ if (topic != null) {
+ attributesBuilder.put(LABEL_TOPIC, topic);
+ }
+ timerEnqueueTotal.add(1, attributesBuilder.build());
}
public static AttributesBuilder newAttributesBuilder() {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index c4f7e6c77..c6ab81df4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -17,32 +17,6 @@
package org.apache.rocketmq.store.timer;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
-import java.util.function.Function;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.TopicFilterType;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.PutMessageResult;
-import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.store.config.BrokerRole;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -66,6 +40,32 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.util.PerfCounter;
public class TimerMessageStore {
@@ -1106,6 +1106,13 @@ public class TimerMessageStore {
return msgInner;
}
+ private String getRealTopic(MessageExt msgExt) {
+ if (msgExt == null) {
+ return null;
+ }
+ return msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
+ }
+
private long formatTimeMs(long timeMs) {
return timeMs / precisionMs * precisionMs;
}
@@ -1300,6 +1307,7 @@ public class TimerMessageStore {
req.setLatch(latch);
try {
perfs.startTick("enqueue_put");
+
DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
if (shouldRunningDequeue && req.getDelayTime()
< currWriteTimeMs) {
dequeuePutQueue.put(req);
} else {
@@ -1414,6 +1422,7 @@ public class TimerMessageStore {
}
try {
perfs.startTick("dequeue_put");
+
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg()));
addMetric(tr.getMsg(), -1);
MessageExtBrokerInner msg =
convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
doRes = PUT_NEED_RETRY != doPut(msg,
needRoll(tr.getMagic()));
@@ -1600,7 +1609,7 @@ public class TimerMessageStore {
TimerMessageStore.LOGGER.info("[{}]Timer
progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{}
currReadOffset:{} offsetBehind:{} behindMaster:{} " +
"enqPutQueue:{} deqGetQueue:{} deqPutQueue:{}
allCongestNum:{} enqExpiredStoreTime:{}",
storeConfig.getBrokerRole(),
- format(commitReadTimeMs), format(currReadTimeMs),
format(currWriteTimeMs), getReadBehind(),
+ format(commitReadTimeMs), format(currReadTimeMs),
format(currWriteTimeMs), getDequeueBehind(),
tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset,
timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
enqueuePutQueue.size(), dequeueGetQueue.size(),
dequeuePutQueue.size(), getAllCongestNum(),
format(lastEnqueueButExpiredStoreTime));
}
@@ -1636,22 +1645,34 @@ public class TimerMessageStore {
return false;
}
- public long getEnqueueBehind() {
+ public long getEnqueueBehindMessages() {
+ long tmpQueueOffset = currQueueOffset;
+ ConsumeQueue cq = (ConsumeQueue)
messageStore.getConsumeQueue(TIMER_TOPIC, 0);
+ long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue();
+ return maxOffsetInQueue - tmpQueueOffset;
+ }
+
+ public long getEnqueueBehindMillis() {
if (System.currentTimeMillis() - lastEnqueueButExpiredTime < 2000) {
return (System.currentTimeMillis() -
lastEnqueueButExpiredStoreTime) / 1000;
}
return 0;
}
- public long getReadBehind() {
- return (System.currentTimeMillis() - currReadTimeMs) / 1000;
+ public long getEnqueueBehind() {
+ return getEnqueueBehindMillis() / 1000;
}
- public long getOffsetBehind() {
- long tmpQueueOffset = currQueueOffset;
- ConsumeQueue cq = (ConsumeQueue)
messageStore.getConsumeQueue(TIMER_TOPIC, 0);
- long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue();
- return maxOffsetInQueue - tmpQueueOffset;
+ public long getDequeueBehindMessages() {
+ return timerWheel.getAllNum(currReadTimeMs);
+ }
+
+ public long getDequeueBehindMillis() {
+ return System.currentTimeMillis() - currReadTimeMs;
+ }
+
+ public long getDequeueBehind() {
+ return getDequeueBehindMillis() / 1000;
}
public float getEnqueueTps() {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 1d7aeae38..0b1194953 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -166,7 +166,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
if (result == AppendResult.SUCCESS) {
Attributes attributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC,
request.getTopic())
- .put(TieredStoreMetricsConstant.LABEL_QUEUE,
request.getQueueId())
+ .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID,
request.getQueueId())
.put(TieredStoreMetricsConstant.LABEL_FILE_TYPE,
TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(1,
attributes);
@@ -271,7 +271,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}
Attributes attributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC, mq.getTopic())
- .put(TieredStoreMetricsConstant.LABEL_QUEUE, mq.getQueueId())
+ .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID,
mq.getQueueId())
.put(TieredStoreMetricsConstant.LABEL_FILE_TYPE,
TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(queueOffset -
beforeOffset, attributes);
@@ -290,7 +290,8 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
}
}
- public void handleAppendCommitLogResult(AppendResult result,
TieredMessageQueueContainer container, long queueOffset,
+ public void handleAppendCommitLogResult(AppendResult result,
TieredMessageQueueContainer container,
+ long queueOffset,
long dispatchOffset, long newCommitLogOffset, int size, long tagCode,
ByteBuffer message) {
MessageQueue mq = container.getMessageQueue();
String topic = mq.getTopic();
@@ -449,7 +450,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
cqMetricsMap.forEach((messageQueue, count) -> {
Attributes attributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC,
messageQueue.getTopic())
- .put(TieredStoreMetricsConstant.LABEL_QUEUE,
messageQueue.getQueueId())
+ .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID,
messageQueue.getQueueId())
.put(TieredStoreMetricsConstant.LABEL_FILE_TYPE,
TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(count,
attributes);
@@ -457,7 +458,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
ifMetricsMap.forEach((messageQueue, count) -> {
Attributes attributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_TOPIC,
messageQueue.getTopic())
- .put(TieredStoreMetricsConstant.LABEL_QUEUE,
messageQueue.getQueueId())
+ .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID,
messageQueue.getQueueId())
.put(TieredStoreMetricsConstant.LABEL_FILE_TYPE,
TieredFileSegment.FileSegmentType.INDEX.name().toLowerCase())
.build();
TieredStoreMetricsManager.messagesDispatchTotal.add(count,
attributes);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
index 3029d5dd5..ad7281510 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
@@ -40,7 +40,7 @@ public class TieredStoreMetricsConstant {
public static final String LABEL_TOPIC = "topic";
public static final String LABEL_GROUP = "group";
- public static final String LABEL_QUEUE = "queue";
+ public static final String LABEL_QUEUE_ID = "queue_id";
public static final String LABEL_FILE_TYPE = "file_type";
// blob constants
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 69fe28047..0b0dfd63a 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -70,7 +70,7 @@ import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant
import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.HISTOGRAM_PROVIDER_RPC_LATENCY;
import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.HISTOGRAM_UPLOAD_BYTES;
import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
-import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE;
+import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE_ID;
import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC;
import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.STORAGE_MEDIUM_BLOB;
@@ -181,13 +181,13 @@ public class TieredStoreMetricsManager {
Attributes commitLogAttributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
- .put(LABEL_QUEUE, mq.getQueueId())
+ .put(LABEL_QUEUE_ID, mq.getQueueId())
.put(LABEL_FILE_TYPE,
TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
measurement.record(Math.max(maxOffset -
container.getDispatchOffset(), 0), commitLogAttributes);
Attributes consumeQueueAttributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
- .put(LABEL_QUEUE, mq.getQueueId())
+ .put(LABEL_QUEUE_ID, mq.getQueueId())
.put(LABEL_FILE_TYPE,
TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
.build();
measurement.record(Math.max(maxOffset -
container.getConsumeQueueMaxOffset(), 0), consumeQueueAttributes);
@@ -209,7 +209,7 @@ public class TieredStoreMetricsManager {
Attributes commitLogAttributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
- .put(LABEL_QUEUE, mq.getQueueId())
+ .put(LABEL_QUEUE_ID, mq.getQueueId())
.put(LABEL_FILE_TYPE,
TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
.build();
long commitLogDispatchLatency =
next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(),
container.getDispatchOffset());
@@ -221,7 +221,7 @@ public class TieredStoreMetricsManager {
Attributes consumeQueueAttributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
- .put(LABEL_QUEUE, mq.getQueueId())
+ .put(LABEL_QUEUE_ID, mq.getQueueId())
.put(LABEL_FILE_TYPE,
TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
.build();
long consumeQueueDispatchOffset =
container.getConsumeQueueMaxOffset();
@@ -307,7 +307,7 @@ public class TieredStoreMetricsManager {
MessageQueue mq = container.getMessageQueue();
Attributes attributes = newAttributesBuilder()
.put(LABEL_TOPIC, mq.getTopic())
- .put(LABEL_QUEUE, mq.getQueueId())
+ .put(LABEL_QUEUE_ID, mq.getQueueId())
.build();
measurement.record(System.currentTimeMillis() -
timestamp, attributes);
}