This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6d7513425c [RIP-46] Enhanced metrics for timing and transactional
messages (#7500)
6d7513425c is described below
commit 6d7513425c2aeb17e527be9d0d98d47f7251927d
Author: Ji Juntao <[email protected]>
AuthorDate: Mon Jan 22 16:56:05 2024 +0800
[RIP-46] Enhanced metrics for timing and transactional messages (#7500)
* add request codes' distribution and timing messages' distribution
* remove the requestCode distribution.
* add delay message latency distribution.
* add transaction metrics
* transaction metric of topics finished, v1.
* add the transaction metrics, to be tested.
* fix the judgement of putMessageResult
* optimize.
* add config.
* fix test case.
* add unit tests for transactionMetrics.
* remove chinese character
* add rocksdb metrics.
* add more rocksdb metrics.
* fix NPE
* avoid the total time is 0.
* add license
* remove useless import.
---
.../apache/rocketmq/broker/BrokerController.java | 9 +
.../rocketmq/broker/BrokerPathConfigHelper.java | 3 +
.../broker/metrics/BrokerMetricsConstant.java | 5 +
.../broker/metrics/BrokerMetricsManager.java | 72 +++++-
.../broker/processor/EndTransactionProcessor.java | 18 ++
.../broker/processor/SendMessageProcessor.java | 26 ++-
.../broker/transaction/TransactionMetrics.java | 259 +++++++++++++++++++++
.../TransactionMetricsFlushService.java | 55 +++++
.../transaction/TransactionalMessageService.java | 5 +
.../DefaultTransactionalMessageCheckListener.java | 2 +
.../queue/TransactionalMessageServiceImpl.java | 19 ++
.../processor/EndTransactionProcessorTest.java | 5 +
.../transaction/queue/TransactionMetricsTest.java | 83 +++++++
.../util/TransactionalMessageServiceImpl.java | 11 +
.../org/apache/rocketmq/common/BrokerConfig.java | 10 +
.../org/apache/rocketmq/common/ConfigManager.java | 11 +-
.../common/config/RocksDBConfigManager.java | 13 +-
.../common/metrics/NopObservableDoubleGauge.java | 22 ++
.../remoting/metrics/RemotingMetricsConstant.java | 1 -
.../apache/rocketmq/store/RocksDBMessageStore.java | 12 +
.../store/metrics/DefaultStoreMetricsConstant.java | 12 +
.../store/metrics/DefaultStoreMetricsManager.java | 63 ++++-
.../store/metrics/RocksDBStoreMetricsManager.java | 154 ++++++++++++
.../store/queue/RocksDBConsumeQueueStore.java | 4 +
.../rocketmq/store/timer/TimerMessageStore.java | 5 +
25 files changed, 856 insertions(+), 23 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8d29d44383..af90e5f87e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -104,6 +104,7 @@ import
org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
+import org.apache.rocketmq.broker.transaction.TransactionMetricsFlushService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import
org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
@@ -277,6 +278,7 @@ public class BrokerController {
private BrokerMetricsManager brokerMetricsManager;
private ColdDataPullRequestHoldService coldDataPullRequestHoldService;
private ColdDataCgCtrService coldDataCgCtrService;
+ private TransactionMetricsFlushService transactionMetricsFlushService;
public BrokerController(
final BrokerConfig brokerConfig,
@@ -963,6 +965,9 @@ public class BrokerController {
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new
TransactionalMessageCheckService(this);
+ this.transactionMetricsFlushService = new
TransactionMetricsFlushService(this);
+ this.transactionMetricsFlushService.start();
+
}
private void initialAcl() {
@@ -1440,6 +1445,10 @@ public class BrokerController {
this.endTransactionExecutor.shutdown();
}
+ if (this.transactionMetricsFlushService != null) {
+ this.transactionMetricsFlushService.shutdown();
+ }
+
if (this.escapeBridge != null) {
escapeBridge.shutdown();
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index cea321ef78..0b2f52f32e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -60,6 +60,9 @@ public class BrokerPathConfigHelper {
public static String getTimerMetricsPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator +
"timermetrics";
}
+ public static String getTransactionMetricsPath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator +
"transactionMetrics";
+ }
public static String getConsumerFilterPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator +
"consumerFilter.json";
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
index 73b40f6ba5..5733aa40ba 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
@@ -38,6 +38,11 @@ public class BrokerMetricsConstant {
public static final String GAUGE_CONSUMER_READY_MESSAGES =
"rocketmq_consumer_ready_messages";
public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL =
"rocketmq_send_to_dlq_messages_total";
+ public static final String COUNTER_COMMIT_MESSAGES_TOTAL =
"rocketmq_commit_messages_total";
+ public static final String COUNTER_ROLLBACK_MESSAGES_TOTAL =
"rocketmq_rollback_messages_total";
+ public static final String HISTOGRAM_FINISH_MSG_LATENCY =
"rocketmq_finish_message_latency";
+ public static final String GAUGE_HALF_MESSAGES = "rocketmq_half_messages";
+
public static final String LABEL_CLUSTER_NAME = "cluster";
public static final String LABEL_NODE_TYPE = "node_type";
public static final String NODE_TYPE_BROKER = "broker";
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 307fc02ef0..fc7e97bda9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -40,13 +40,6 @@ import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.resources.Resource;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -68,12 +61,23 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
import org.slf4j.bridge.SLF4JBridgeHandler;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_COMMIT_MESSAGES_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_ROLLBACK_MESSAGES_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
@@ -83,8 +87,10 @@ import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CON
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_HALF_MESSAGES;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
@@ -141,6 +147,10 @@ public class BrokerMetricsManager {
public static ObservableLongGauge consumerQueueingLatency = new
NopObservableLongGauge();
public static ObservableLongGauge consumerReadyMessages = new
NopObservableLongGauge();
public static LongCounter sendToDlqMessages = new NopLongCounter();
+ public static ObservableLongGauge halfMessages = new
NopObservableLongGauge();
+ public static LongCounter commitMessagesTotal = new NopLongCounter();
+ public static LongCounter rollBackMessagesTotal = new NopLongCounter();
+ public static LongHistogram transactionFinishLatency = new
NopLongHistogram();
public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new
ArrayList<String>() {
{
@@ -348,6 +358,7 @@ public class BrokerMetricsManager {
initRequestMetrics();
initConnectionMetrics();
initLagAndDlqMetrics();
+ initTransactionMetrics();
initOtherMetrics();
}
@@ -361,6 +372,15 @@ public class BrokerMetricsManager {
2d * 1024 * 1024, //2MB
4d * 1024 * 1024 //4MB
);
+
+ List<Double> commitLatencyBuckets = Arrays.asList(
+ 1d * 1 * 1 * 5, //5s
+ 1d * 1 * 1 * 60, //1min
+ 1d * 1 * 10 * 60, //10min
+ 1d * 1 * 60 * 60, //1h
+ 1d * 12 * 60 * 60, //12h
+ 1d * 24 * 60 * 60 //24h
+ );
InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_MESSAGE_SIZE)
@@ -371,6 +391,16 @@ public class BrokerMetricsManager {
SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder,
brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(messageSizeSelector,
messageSizeViewBuilder.build());
+ InstrumentSelector commitLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_FINISH_MSG_LATENCY)
+ .build();
+ ViewBuilder commitLatencyViewBuilder = View.builder()
+
.setAggregation(Aggregation.explicitBucketHistogram(commitLatencyBuckets));
+ // To config the cardinalityLimit for openTelemetry metrics exporting.
+ SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder,
brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(commitLatencySelector,
commitLatencyViewBuilder.build());
+
for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair :
RemotingMetricsManager.getMetricsView()) {
ViewBuilder viewBuilder = selectorViewPair.getObject2();
SdkMeterProviderUtil.setCardinalityLimit(viewBuilder,
brokerConfig.getMetricsOtelCardinalityLimit());
@@ -560,6 +590,34 @@ public class BrokerMetricsManager {
.build();
}
+ private void initTransactionMetrics() {
+ commitMessagesTotal =
brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
+ .setDescription("Total number of commit messages")
+ .build();
+
+ rollBackMessagesTotal =
brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL)
+ .setDescription("Total number of rollback messages")
+ .build();
+
+ transactionFinishLatency =
brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY)
+ .setDescription("Transaction finish latency")
+ .ofLongs()
+ .setUnit("ms")
+ .build();
+
+ halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES)
+ .setDescription("Half messages of all topics")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+
brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
+ .forEach((topic, metric) -> {
+ measurement.record(
+ metric.getCount().get(),
+
newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC,
topic).build()
+ );
+ });
+ });
+ }
private void initOtherMetrics() {
RemotingMetricsManager.initMetrics(brokerMeter,
BrokerMetricsManager::newAttributesBuilder);
messageStore.initMetrics(brokerMeter,
BrokerMetricsManager::newAttributesBuilder);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index f6aa0d48c9..e812a53ba7 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.TopicFilterType;
@@ -40,6 +41,8 @@ import
org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+
/**
* EndTransaction processor: process commit and rollback message
*/
@@ -144,6 +147,16 @@ public class EndTransactionProcessor implements
NettyRequestProcessor {
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
+ // successful committed, then total num of
half-messages minus 1
+
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(),
-1);
+ BrokerMetricsManager.commitMessagesTotal.add(1,
BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, msgInner.getTopic())
+ .build());
+ // record the commit latency.
+ Long commitLatency = (System.currentTimeMillis() -
result.getPrepareMessage().getBornTimestamp()) / 1000;
+
BrokerMetricsManager.transactionFinishLatency.record(commitLatency,
BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC, msgInner.getTopic())
+ .build());
}
return sendResult;
}
@@ -161,6 +174,11 @@ public class EndTransactionProcessor implements
NettyRequestProcessor {
RemotingCommand res =
checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
+ // roll back, then total num of half-messages minus 1
+
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC),
-1);
+ BrokerMetricsManager.rollBackMessagesTotal.add(1,
BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_TOPIC,
result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
+ .build());
}
return res;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 4ec84c1461..912d502eab 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -18,11 +18,6 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
@@ -65,10 +60,17 @@ import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
@@ -300,7 +302,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
// Map<String, String> oriProps =
MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag =
oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- boolean sendTransactionPrepareMessage = false;
+ boolean sendTransactionPrepareMessage;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 &&
msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if
(this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
@@ -311,6 +313,8 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
return response;
}
sendTransactionPrepareMessage = true;
+ } else {
+ sendTransactionPrepareMessage = false;
}
long beginTimeMillis = this.brokerController.getMessageStore().now();
@@ -332,6 +336,12 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
if (responseFuture != null) {
doResponse(ctx, request, responseFuture);
}
+
+ // record the transaction metrics, responseFuture == null
means put successfully
+ if (sendTransactionPrepareMessage && (responseFuture == null
|| responseFuture.getCode() == ResponseCode.SUCCESS)) {
+
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC),
1);
+ }
+
sendMessageCallback.onComplete(sendMessageContext, response);
}, this.brokerController.getPutMessageFutureExecutor());
// Returns null to release the send message thread
@@ -344,6 +354,10 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
}
handlePutMessageResult(putMessageResult, response, request,
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis,
mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
+ // record the transaction metrics
+ if (putMessageResult.getPutMessageStatus() ==
PutMessageStatus.PUT_OK && putMessageResult.getAppendMessageResult().isOk()) {
+
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC),
1);
+ }
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
new file mode 100644
index 0000000000..ad30c73c60
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.transaction;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.google.common.io.Files;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class TransactionMetrics extends ConfigManager {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private ConcurrentMap<String, Metric> transactionCounts =
+ new ConcurrentHashMap<>(1024);
+
+ private DataVersion dataVersion = new DataVersion();
+
+ private final String configPath;
+
+ public TransactionMetrics(String configPath) {
+ this.configPath = configPath;
+ }
+
+ public long addAndGet(String topic, int value) {
+ Metric pair = getTopicPair(topic);
+ getDataVersion().nextVersion();
+ pair.setTimeStamp(System.currentTimeMillis());
+ return pair.getCount().addAndGet(value);
+ }
+
+ public Metric getTopicPair(String topic) {
+ Metric pair = transactionCounts.get(topic);
+ if (null != pair) {
+ return pair;
+ }
+ pair = new Metric();
+ final Metric previous = transactionCounts.putIfAbsent(topic, pair);
+ if (null != previous) {
+ return previous;
+ }
+ return pair;
+ }
+ public long getTransactionCount(String topic) {
+ Metric pair = transactionCounts.get(topic);
+ if (null == pair) {
+ return 0;
+ } else {
+ return pair.getCount().get();
+ }
+ }
+
+ public Map<String, Metric> getTransactionCounts() {
+ return transactionCounts;
+ }
+ public void setTransactionCounts(ConcurrentMap<String, Metric>
transactionCounts) {
+ this.transactionCounts = transactionCounts;
+ }
+
+ protected void write0(Writer writer) {
+ TransactionMetricsSerializeWrapper wrapper = new
TransactionMetricsSerializeWrapper();
+ wrapper.setTransactionCount(transactionCounts);
+ wrapper.setDataVersion(dataVersion);
+ JSON.writeJSONString(writer, wrapper,
SerializerFeature.BrowserCompatible);
+ }
+
+ @Override
+ public String encode() {
+ return encode(false);
+ }
+
+ @Override
+ public String configFilePath() {
+ return configPath;
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ TransactionMetricsSerializeWrapper
transactionMetricsSerializeWrapper =
+ TransactionMetricsSerializeWrapper.fromJson(jsonString,
TransactionMetricsSerializeWrapper.class);
+ if (transactionMetricsSerializeWrapper != null) {
+
this.transactionCounts.putAll(transactionMetricsSerializeWrapper.getTransactionCount());
+
this.dataVersion.assignNewOne(transactionMetricsSerializeWrapper.getDataVersion());
+ }
+ }
+ }
+
+ @Override
+ public String encode(boolean prettyFormat) {
+ TransactionMetricsSerializeWrapper metricsSerializeWrapper = new
TransactionMetricsSerializeWrapper();
+ metricsSerializeWrapper.setDataVersion(this.dataVersion);
+ metricsSerializeWrapper.setTransactionCount(this.transactionCounts);
+ return metricsSerializeWrapper.toJson(prettyFormat);
+ }
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+ public void setDataVersion(DataVersion dataVersion) {
+ this.dataVersion = dataVersion;
+ }
+
+ public void cleanMetrics(Set<String> topics) {
+ if (topics == null || topics.isEmpty()) {
+ return;
+ }
+ Iterator<Map.Entry<String, Metric>> iterator =
transactionCounts.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Metric> entry = iterator.next();
+ final String topic = entry.getKey();
+ if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)) {
+ continue;
+ }
+ if (!topics.contains(topic)) {
+ continue;
+ }
+ // in the input topics set, then remove it.
+ iterator.remove();
+ }
+ }
+
+ public static class TransactionMetricsSerializeWrapper extends
RemotingSerializable {
+ private ConcurrentMap<String, Metric> transactionCount =
+ new ConcurrentHashMap<>(1024);
+ private DataVersion dataVersion = new DataVersion();
+
+ public ConcurrentMap<String, Metric> getTransactionCount() {
+ return transactionCount;
+ }
+
+ public void setTransactionCount(
+ ConcurrentMap<String, Metric> transactionCount) {
+ this.transactionCount = transactionCount;
+ }
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+ public void setDataVersion(DataVersion dataVersion) {
+ this.dataVersion = dataVersion;
+ }
+ }
+
+ @Override
+ public synchronized void persist() {
+ String config = configFilePath();
+ String temp = config + ".tmp";
+ String backup = config + ".bak";
+ BufferedWriter bufferedWriter = null;
+ try {
+ File tmpFile = new File(temp);
+ File parentDirectory = tmpFile.getParentFile();
+ if (!parentDirectory.exists()) {
+ if (!parentDirectory.mkdirs()) {
+ log.error("Failed to create directory: {}",
parentDirectory.getCanonicalPath());
+ return;
+ }
+ }
+
+ if (!tmpFile.exists()) {
+ if (!tmpFile.createNewFile()) {
+ log.error("Failed to create file: {}",
tmpFile.getCanonicalPath());
+ return;
+ }
+ }
+ bufferedWriter = new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(tmpFile, false),
+ StandardCharsets.UTF_8));
+ write0(bufferedWriter);
+ bufferedWriter.flush();
+ bufferedWriter.close();
+ log.debug("Finished writing tmp file: {}", temp);
+
+ File configFile = new File(config);
+ if (configFile.exists()) {
+ Files.copy(configFile, new File(backup));
+ configFile.delete();
+ }
+
+ tmpFile.renameTo(configFile);
+ } catch (IOException e) {
+ log.error("Failed to persist {}", temp, e);
+ } finally {
+ if (null != bufferedWriter) {
+ try {
+ bufferedWriter.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+ }
+
+ public static class Metric {
+ private AtomicLong count;
+ private long timeStamp;
+
+ public Metric() {
+ count = new AtomicLong(0);
+ timeStamp = System.currentTimeMillis();
+ }
+
+ public AtomicLong getCount() {
+ return count;
+ }
+
+ public void setCount(AtomicLong count) {
+ this.count = count;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[%d,%d]", count.get(), timeStamp);
+ }
+ }
+
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java
new file mode 100644
index 0000000000..948f9fbc8e
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class TransactionMetricsFlushService extends ServiceThread {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+ private BrokerController brokerController;
+ public TransactionMetricsFlushService(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ @Override
+ public String getServiceName() {
+ return "TransactionFlushService";
+ }
+
+ @Override
+ public void run() {
+ log.info(this.getServiceName() + " service start");
+ long start = System.currentTimeMillis();
+ while (!this.isStopped()) {
+ try {
+ if (System.currentTimeMillis() - start >
brokerController.getBrokerConfig().getTransactionMetricFlushInterval()) {
+ start = System.currentTimeMillis();
+
brokerController.getTransactionalMessageService().getTransactionMetrics().persist();
+
waitForRunning(brokerController.getBrokerConfig().getTransactionMetricFlushInterval());
+ }
+ } catch (Throwable e) {
+ log.error("Error occurred in " + getServiceName(), e);
+ }
+ }
+ log.info(this.getServiceName() + " service end");
+ }
+}
\ No newline at end of file
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java
index 8dbbf980eb..849e64024b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionalMessageService.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.transaction;
import java.util.concurrent.CompletableFuture;
+
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import
org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
@@ -87,4 +88,8 @@ public interface TransactionalMessageService {
* Close transaction service.
*/
void close();
+
+ TransactionMetrics getTransactionMetrics();
+
+ void setTransactionMetrics(TransactionMetrics transactionMetrics);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
index ad02ae4270..8e2b679b40 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
@@ -49,6 +49,8 @@ public class DefaultTransactionalMessageCheckListener extends
AbstractTransactio
if (putMessageResult != null &&
putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
log.info("Put checked-too-many-time half message to
TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " +
"commitLogOffset={}, real topic={}",
msgExt.getQueueOffset(), msgExt.getCommitLogOffset(),
msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
+ // discarded, then the num of half-messages minus 1
+
this.getBrokerController().getTransactionalMessageService().getTransactionMetrics().addAndGet(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
-1);
} else {
log.error("Put checked-too-many-time half message to
TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(),
msgExt.getMsgId());
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
index 48db828e0a..9fdfd0a710 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
@@ -27,8 +27,11 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.OperationResult;
+import org.apache.rocketmq.broker.transaction.TransactionMetrics;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -70,10 +73,25 @@ public class TransactionalMessageServiceImpl implements
TransactionalMessageServ
private ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new
ConcurrentHashMap<>();
+ private TransactionMetrics transactionMetrics;
+
public TransactionalMessageServiceImpl(TransactionalMessageBridge
transactionBridge) {
this.transactionalMessageBridge = transactionBridge;
transactionalOpBatchService = new
TransactionalOpBatchService(transactionalMessageBridge.getBrokerController(),
this);
transactionalOpBatchService.start();
+ transactionMetrics = new
TransactionMetrics(BrokerPathConfigHelper.getTransactionMetricsPath(
+
transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getStorePathRootDir()));
+ transactionMetrics.load();
+ }
+
+ @Override
+ public TransactionMetrics getTransactionMetrics() {
+ return transactionMetrics;
+ }
+
+ @Override
+ public void setTransactionMetrics(TransactionMetrics transactionMetrics) {
+ this.transactionMetrics = transactionMetrics;
}
@@ -632,6 +650,7 @@ public class TransactionalMessageServiceImpl implements
TransactionalMessageServ
if (this.transactionalOpBatchService != null) {
this.transactionalOpBatchService.shutdown();
}
+ this.getTransactionMetrics().persist();
}
public Message getOpMessage(int queueId, String moreData) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index 72b339ae73..a364a1bbee 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.OperationResult;
+import org.apache.rocketmq.broker.transaction.TransactionMetrics;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -71,8 +72,12 @@ public class EndTransactionProcessorTest {
@Mock
private TransactionalMessageService transactionMsgService;
+ @Mock
+ private TransactionMetrics transactionMetrics;
+
@Before
public void init() {
+
when(transactionMsgService.getTransactionMetrics()).thenReturn(transactionMetrics);
brokerController.setMessageStore(messageStore);
brokerController.setTransactionalMessageService(transactionMsgService);
endTransactionProcessor = new
EndTransactionProcessor(brokerController);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
new file mode 100644
index 0000000000..690b4eabb5
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction.queue;
+
+import org.apache.rocketmq.broker.transaction.TransactionMetrics;
+import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Collections;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TransactionMetricsTest {
+ private TransactionMetrics transactionMetrics;
+ private String configPath;
+
+ @Before
+ public void setUp() throws Exception {
+ configPath = "configPath";
+ transactionMetrics = new TransactionMetrics(configPath);
+ }
+
+ /**
+ * test addAndGet method
+ */
+ @Test
+ public void testAddAndGet() {
+ String topic = "testAddAndGet";
+ int value = 10;
+ long result = transactionMetrics.addAndGet(topic, value);
+
+ assert result == value;
+ }
+
+ @Test
+ public void testGetTopicPair() {
+ String topic = "getTopicPair";
+ Metric result = transactionMetrics.getTopicPair(topic);
+ assert result != null;
+ }
+
+ @Test
+ public void testGetTransactionCount() {
+ String topicExist = "topicExist";
+ String topicNotExist = "topicNotExist";
+
+ transactionMetrics.addAndGet(topicExist, 10);
+
+ assert transactionMetrics.getTransactionCount(topicExist) == 10;
+ assert transactionMetrics.getTransactionCount(topicNotExist) == 0;
+ }
+
+
+ /**
+ * test clean metrics
+ */
+ @Test
+ public void testCleanMetrics() {
+ String topic = "testCleanMetrics";
+ int value = 10;
+ assert transactionMetrics.addAndGet(topic, value) == value;
+ transactionMetrics.cleanMetrics(Collections.singleton(topic));
+ assert transactionMetrics.getTransactionCount(topic) == 0;
+ }
+}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
index 3cbfab2c27..2de4c307e0 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.util;
import java.util.concurrent.CompletableFuture;
import
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.OperationResult;
+import org.apache.rocketmq.broker.transaction.TransactionMetrics;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
@@ -70,4 +71,14 @@ public class TransactionalMessageServiceImpl implements
TransactionalMessageServ
public void close() {
}
+
+ @Override
+ public TransactionMetrics getTransactionMetrics() {
+ return null;
+ }
+
+ @Override
+ public void setTransactionMetrics(TransactionMetrics transactionMetrics) {
+
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index bedc7f386b..0a2c528f86 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -269,6 +269,8 @@ public class BrokerConfig extends BrokerIdentity {
@ImportantField
private long transactionCheckInterval = 30 * 1000;
+ private long transactionMetricFlushInterval = 3 * 1000;
+
/**
* transaction batch op message
*/
@@ -1789,4 +1791,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setSplitRegistrationSize(int splitRegistrationSize) {
this.splitRegistrationSize = splitRegistrationSize;
}
+
+ public long getTransactionMetricFlushInterval() {
+ return transactionMetricFlushInterval;
+ }
+
+ public void setTransactionMetricFlushInterval(long
transactionMetricFlushInterval) {
+ this.transactionMetricFlushInterval = transactionMetricFlushInterval;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
index 6c3bed47cf..5e99759619 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
@@ -16,13 +16,14 @@
*/
package org.apache.rocketmq.common;
-import java.io.IOException;
-import java.util.Map;
-
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.rocksdb.Statistics;
+
+import java.io.IOException;
+import java.util.Map;
public abstract class ConfigManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@@ -103,4 +104,8 @@ public abstract class ConfigManager {
public abstract String encode(final boolean prettyFormat);
public abstract void decode(final String jsonString);
+
+ public Statistics getStatistics() {
+ return rocksDBConfigManager == null ? null :
rocksDBConfigManager.getStatistics();
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
index f958bbdf0b..d1ec894685 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/RocksDBConfigManager.java
@@ -16,15 +16,16 @@
*/
package org.apache.rocketmq.common.config;
-import java.util.function.BiConsumer;
-
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;
+import java.util.function.BiConsumer;
+
public class RocksDBConfigManager {
protected static final Logger BROKER_LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -105,4 +106,12 @@ public class RocksDBConfigManager {
public void batchPutWithWal(final WriteBatch batch) throws Exception {
this.configRocksDBStorage.batchPutWithWal(batch);
}
+
+ public Statistics getStatistics() {
+ if (this.configRocksDBStorage == null) {
+ return null;
+ }
+
+ return configRocksDBStorage.getStatistics();
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java
b/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java
new file mode 100644
index 0000000000..899ac14a9a
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/metrics/NopObservableDoubleGauge.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.metrics;
+
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+
+public class NopObservableDoubleGauge implements ObservableDoubleGauge {
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
index 730469e590..f9b3e4c6fa 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.remoting.metrics;
public class RemotingMetricsConstant {
public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency";
-
public static final String LABEL_PROTOCOL_TYPE = "protocol_type";
public static final String LABEL_REQUEST_CODE = "request_code";
public static final String LABEL_RESPONSE_CODE = "response_code";
diff --git
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 87ccb5474b..6141b778bf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -19,12 +19,17 @@ package org.apache.rocketmq.store;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
+import org.apache.rocketmq.store.metrics.RocksDBStoreMetricsManager;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
import org.apache.rocketmq.store.queue.RocksDBConsumeQueue;
@@ -166,4 +171,11 @@ public class RocksDBMessageStore extends
DefaultMessageStore {
// todo
return 0;
}
+
+ @Override
+ public void initMetrics(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier) {
+ DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier,
this);
+ // Also add some metrics for rocksdb's monitoring.
+ RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier,
this);
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
index 271604b1e5..956501c64f 100644
---
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
@@ -30,10 +30,22 @@ public class DefaultStoreMetricsConstant {
public static final String COUNTER_TIMER_ENQUEUE_TOTAL =
"rocketmq_timer_enqueue_total";
public static final String COUNTER_TIMER_DEQUEUE_TOTAL =
"rocketmq_timer_dequeue_total";
+ public static final String GAUGE_TIMER_MESSAGE_SNAPSHOT =
"rocketmq_timer_message_snapshot";
+ public static final String HISTOGRAM_DELAY_MSG_LATENCY =
"rocketmq_delay_message_latency";
public static final String LABEL_STORAGE_TYPE = "storage_type";
public static final String DEFAULT_STORAGE_TYPE = "local";
public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
public static final String DEFAULT_STORAGE_MEDIUM = "disk";
public static final String LABEL_TOPIC = "topic";
+ public static final String LABEL_TIMING_BOUND = "timer_bound_s";
+ public static final String GAUGE_BYTES_ROCKSDB_WRITTEN =
"rocketmq_rocksdb_bytes_written";
+ public static final String GAUGE_BYTES_ROCKSDB_READ =
"rocketmq_rocksdb_bytes_read";
+
+ public static final String GAUGE_TIMES_ROCKSDB_WRITTEN_SELF =
"rocketmq_rocksdb_times_written_self";
+ public static final String GAUGE_TIMES_ROCKSDB_WRITTEN_OTHER =
"rocketmq_rocksdb_times_written_other";
+ public static final String GAUGE_RATE_ROCKSDB_CACHE_HIT =
"rocketmq_rocksdb_rate_cache_hit";
+ public static final String GAUGE_TIMES_ROCKSDB_COMPRESSED =
"rocketmq_rocksdb_times_compressed";
+ public static final String GAUGE_BYTES_READ_AMPLIFICATION =
"rocketmq_rocksdb_read_amplification_bytes";
+ public static final String GAUGE_TIMES_ROCKSDB_READ =
"rocketmq_rocksdb_times_read";
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index 45a6bbc680..db4c7bb766 100644
---
a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -20,19 +20,29 @@ import com.google.common.collect.Lists;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.ViewBuilder;
-import java.io.File;
-import java.util.List;
-import java.util.function.Supplier;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.timer.Slot;
import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.apache.rocketmq.store.timer.TimerMetrics;
+import org.apache.rocketmq.store.timer.TimerWheel;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_DEQUEUE_TOTAL;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_ENQUEUE_TOTAL;
@@ -46,9 +56,12 @@ import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUG
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LATENCY;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LAG;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LATENCY;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_MESSAGE_SNAPSHOT;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMING_MESSAGES;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.HISTOGRAM_DELAY_MSG_LATENCY;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TIMING_BOUND;
import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
public class DefaultStoreMetricsManager {
@@ -68,9 +81,26 @@ public class DefaultStoreMetricsManager {
public static LongCounter timerDequeueTotal = new NopLongCounter();
public static LongCounter timerEnqueueTotal = new NopLongCounter();
+ public static ObservableLongGauge timerMessageSnapshot = new
NopObservableLongGauge();
+ public static LongHistogram timerMessageSetLatency = new
NopLongHistogram();
public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView()
{
- return Lists.newArrayList();
+ List<Double> rpcCostTimeBuckets = Arrays.asList(
+ // day * hour * min * second
+ 1d * 1 * 1 * 60, // 60 second
+ 1d * 1 * 10 * 60, // 10 min
+ 1d * 1 * 60 * 60, // 1 hour
+ 1d * 12 * 60 * 60, // 12 hour
+ 1d * 24 * 60 * 60, // 1 day
+ 3d * 24 * 60 * 60 // 3 day
+ );
+ InstrumentSelector selector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_DELAY_MSG_LATENCY)
+ .build();
+ ViewBuilder viewBuilder = View.builder()
+
.setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets));
+ return Lists.newArrayList(new Pair<>(selector, viewBuilder));
}
public static void init(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier,
@@ -168,6 +198,31 @@ public class DefaultStoreMetricsManager {
timerEnqueueTotal =
meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
.setDescription("Total number of timer enqueue")
.build();
+ timerMessageSnapshot =
meter.gaugeBuilder(GAUGE_TIMER_MESSAGE_SNAPSHOT)
+ .setDescription("Timer message distribution snapshot, only
count timing messages in 24h.")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ TimerMetrics timerMetrics =
messageStore.getTimerMessageStore().getTimerMetrics();
+ TimerWheel timerWheel =
messageStore.getTimerMessageStore().getTimerWheel();
+ int precisionMs = messageStoreConfig.getTimerPrecisionMs();
+ List<Integer> timerDist = timerMetrics.getTimerDistList();
+ long currTime = System.currentTimeMillis() / precisionMs *
precisionMs;
+ for (int i = 0; i < timerDist.size(); i++) {
+ int slotBeforeNum = i == 0 ? 0 : timerDist.get(i - 1)
* 1000 / precisionMs;
+ int slotTotalNum = timerDist.get(i) * 1000 /
precisionMs;
+ int periodTotal = 0;
+ for (int j = slotBeforeNum; j < slotTotalNum; j++) {
+ Slot slotEach = timerWheel.getSlot(currTime +
(long) j * precisionMs);
+ periodTotal += slotEach.num;
+ }
+ measurement.record(periodTotal,
newAttributesBuilder().put(LABEL_TIMING_BOUND,
timerDist.get(i).toString()).build());
+ }
+ });
+ timerMessageSetLatency =
meter.histogramBuilder(HISTOGRAM_DELAY_MSG_LATENCY)
+ .setDescription("Timer message set latency distribution")
+ .setUnit("seconds")
+ .ofLongs()
+ .build();
}
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
b/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
new file mode 100644
index 0000000000..6029488056
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.metrics;
+
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopObservableDoubleGauge;
+import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
+import org.apache.rocketmq.store.RocksDBMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore;
+import org.rocksdb.TickerType;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_MEDIUM;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_TYPE;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_BYTES_ROCKSDB_READ;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_BYTES_ROCKSDB_WRITTEN;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM;
+import static
org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE;
+
+public class RocksDBStoreMetricsManager {
+ public static Supplier<AttributesBuilder> attributesBuilderSupplier;
+ public static MessageStoreConfig messageStoreConfig;
+
+ // The cumulative number of bytes read from the database.
+ public static ObservableLongGauge bytesRocksdbRead = new
NopObservableLongGauge();
+
+ // The cumulative number of bytes written to the database.
+ public static ObservableLongGauge bytesRocksdbWritten = new
NopObservableLongGauge();
+
+ // The cumulative number of read operations performed.
+ public static ObservableLongGauge timesRocksdbRead = new
NopObservableLongGauge();
+
+ // The cumulative number of write operations performed.
+ public static ObservableLongGauge timesRocksdbWrittenSelf = new
NopObservableLongGauge();
+ public static ObservableLongGauge timesRocksdbWrittenOther = new
NopObservableLongGauge();
+
+ // The cumulative number of compressions that have occurred.
+ public static ObservableLongGauge timesRocksdbCompressed = new
NopObservableLongGauge();
+
+ // The ratio of the amount of data actually written to the storage medium
to the amount of data written by the application.
+ public static ObservableDoubleGauge bytesRocksdbAmplificationRead = new
NopObservableDoubleGauge();
+
+ // The rate at which cache lookups were served from the cache rather than
needing to be fetched from disk.
+ public static ObservableDoubleGauge rocksdbCacheHitRate = new
NopObservableDoubleGauge();
+
+ public static volatile long blockCacheHitTimes = 0;
+ public static volatile long blockCacheMissTimes = 0;
+
+
+
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView()
{
+ return Lists.newArrayList();
+ }
+
+ public static void init(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier,
+ RocksDBMessageStore messageStore) {
+ RocksDBStoreMetricsManager.attributesBuilderSupplier =
attributesBuilderSupplier;
+ bytesRocksdbWritten = meter.gaugeBuilder(GAUGE_BYTES_ROCKSDB_WRITTEN)
+ .setDescription("The cumulative number of bytes written to the
database.")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.BYTES_WRITTEN),
newAttributesBuilder().put("type", "consume_queue").build());
+ });
+ bytesRocksdbRead = meter.gaugeBuilder(GAUGE_BYTES_ROCKSDB_READ)
+ .setDescription("The cumulative number of bytes read from the
database.")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.BYTES_READ),
newAttributesBuilder().put("type", "consume_queue").build());
+ });
+ timesRocksdbWrittenSelf =
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_WRITTEN_SELF)
+ .setDescription("The cumulative number of write operations
performed by self.")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.WRITE_DONE_BY_SELF),
newAttributesBuilder().put("type", "consume_queue").build());
+ });
+ timesRocksdbWrittenOther =
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_WRITTEN_OTHER)
+ .setDescription("The cumulative number of write operations
performed by other.")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.WRITE_DONE_BY_OTHER),
newAttributesBuilder().put("type", "consume_queue").build());
+ });
+ timesRocksdbRead =
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_READ)
+ .setDescription("The cumulative number of write operations
performed by other.")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.NUMBER_KEYS_READ),
newAttributesBuilder().put("type", "consume_queue").build());
+ });
+ rocksdbCacheHitRate =
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_RATE_ROCKSDB_CACHE_HIT)
+ .setDescription("The rate at which cache lookups were served
from the cache rather than needing to be fetched from disk.")
+ .buildWithCallback(measurement -> {
+ long newHitTimes =
((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.BLOCK_CACHE_HIT);
+ long newMissTimes =
((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.BLOCK_CACHE_MISS);
+ long totalPeriod = newHitTimes - blockCacheHitTimes +
newMissTimes - blockCacheMissTimes;
+ double hitRate = totalPeriod == 0 ? 0 :
(double)(newHitTimes - blockCacheHitTimes) / totalPeriod;
+ blockCacheHitTimes = newHitTimes;
+ blockCacheMissTimes = newMissTimes;
+ measurement.record(hitRate,
newAttributesBuilder().put("type", "consume_queue").build());
+ });
+ timesRocksdbCompressed =
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_TIMES_ROCKSDB_COMPRESSED)
+ .setDescription("The cumulative number of compressions that
have occurred.")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.NUMBER_BLOCK_COMPRESSED),
newAttributesBuilder().put("type", "consume_queue").build());
+ });
+ bytesRocksdbAmplificationRead =
meter.gaugeBuilder(DefaultStoreMetricsConstant.GAUGE_BYTES_READ_AMPLIFICATION)
+ .setDescription("The rate at which cache lookups were served
from the cache rather than needing to be fetched from disk.")
+ .buildWithCallback(measurement -> {
+
measurement.record(((RocksDBConsumeQueueStore)messageStore.getQueueStore())
+
.getStatistics().getTickerCount(TickerType.READ_AMP_TOTAL_READ_BYTES),
newAttributesBuilder().put("type", "consume_queue").build());
+ });
+ }
+
+ public static AttributesBuilder newAttributesBuilder() {
+ if (attributesBuilderSupplier == null) {
+ return Attributes.builder();
+ }
+ return attributesBuilderSupplier.get()
+ .put(LABEL_STORAGE_TYPE, DEFAULT_STORAGE_TYPE)
+ .put(LABEL_STORAGE_MEDIUM, DEFAULT_STORAGE_MEDIUM);
+ }
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 78456cfcd8..4c66696e3c 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -45,6 +45,7 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage;
import org.rocksdb.RocksDBException;
+import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;
public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
@@ -266,6 +267,9 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
}
}
+ public Statistics getStatistics() {
+ return rocksDBStorage.getStatistics();
+ }
@Override
public List<ByteBuffer> rangeQuery(final String topic, final int queueId,
final long startIndex, final int num) throws RocksDBException {
return this.rocksDBConsumeQueueTable.rangeQuery(topic, queueId,
startIndex, num);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 872cd71054..819b3e96a4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import io.opentelemetry.api.common.Attributes;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.common.ServiceThread;
@@ -64,6 +65,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
@@ -686,6 +688,9 @@ public class TimerMessageStore {
return false;
}
}
+ Attributes attributes =
DefaultStoreMetricsManager.newAttributesBuilder()
+ .put(DefaultStoreMetricsConstant.LABEL_TOPIC,
msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC)).build();
+
DefaultStoreMetricsManager.timerMessageSetLatency.record((delayedTime -
msgExt.getBornTimestamp()) / 1000, attributes);
}
} catch (Exception e) {
// here may cause the message loss