This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 40271394e3 [ISSUE #8211] Add two metrics 
rocketmq_topic_create_execution_time and 
rocketmq_consumer_group_create_execution_time (#8212)
40271394e3 is described below

commit 40271394e344df631d288b169befe1a4d7001255
Author: Stephanie0002 <55239858+stephanie0...@users.noreply.github.com>
AuthorDate: Fri May 31 17:06:00 2024 +0800

    [ISSUE #8211] Add two metrics rocketmq_topic_create_execution_time and 
rocketmq_consumer_group_create_execution_time (#8212)
    
    * Add tow metric createTopicTime and createSubscriptionTime in broker
    
    * roll back BrokerConfig.java
    
    * Add metric view of createTopicTime and createSubscriptionTime in broker
    
    * Add two metric rocketmq_active_topic_number and 
rocketmq_active_subscription_number
    
    Signed-off-by: 黄梓淇 <me@U-0MV57FM9-2309.local>
---
 .../broker/metrics/BrokerMetricsConstant.java      |  3 +
 .../broker/metrics/BrokerMetricsManager.java       | 43 +++++++++++
 .../rocketmq/broker/metrics/InvocationStatus.java  | 33 ++++++++
 .../broker/processor/AdminBrokerProcessor.java     | 90 +++++++++++++---------
 4 files changed, 134 insertions(+), 35 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
index 5733aa40ba..0af2ac616c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
@@ -27,6 +27,8 @@ public class BrokerMetricsConstant {
     public static final String COUNTER_THROUGHPUT_IN_TOTAL = 
"rocketmq_throughput_in_total";
     public static final String COUNTER_THROUGHPUT_OUT_TOTAL = 
"rocketmq_throughput_out_total";
     public static final String HISTOGRAM_MESSAGE_SIZE = 
"rocketmq_message_size";
+    public static final String HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME = 
"rocketmq_topic_create_execution_time";
+    public static final String HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME = 
"rocketmq_consumer_group_create_execution_time";
 
     public static final String GAUGE_PRODUCER_CONNECTIONS = 
"rocketmq_producer_connections";
     public static final String GAUGE_CONSUMER_CONNECTIONS = 
"rocketmq_consumer_connections";
@@ -52,6 +54,7 @@ public class BrokerMetricsConstant {
     public static final String LABEL_PROCESSOR = "processor";
 
     public static final String LABEL_TOPIC = "topic";
+    public static final String LABEL_INVOCATION_STATUS = "invocation_status";
     public static final String LABEL_IS_RETRY = "is_retry";
     public static final String LABEL_IS_SYSTEM = "is_system";
     public static final String LABEL_CONSUMER_GROUP = "consumer_group";
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index fc7e97bda9..0050a0dcd4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -64,6 +64,7 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
 import org.slf4j.bridge.SLF4JBridgeHandler;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -92,6 +93,8 @@ import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRO
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
@@ -135,6 +138,8 @@ public class BrokerMetricsManager {
     public static LongCounter throughputInTotal = new NopLongCounter();
     public static LongCounter throughputOutTotal = new NopLongCounter();
     public static LongHistogram messageSize = new NopLongHistogram();
+    public static LongHistogram topicCreateExecuteTime = new 
NopLongHistogram();
+    public static LongHistogram consumerGroupCreateExecuteTime = new 
NopLongHistogram();
 
     // client connection metrics
     public static ObservableLongGauge producerConnection = new 
NopObservableLongGauge();
@@ -381,6 +386,14 @@ public class BrokerMetricsManager {
                 1d * 12 * 60 * 60, //12h
                 1d * 24 * 60 * 60 //24h
         );
+
+        List<Double> createTimeBuckets = Arrays.asList(
+                (double) Duration.ofMillis(10).toMillis(), //10ms
+                (double) Duration.ofMillis(100).toMillis(), //100ms
+                (double) Duration.ofSeconds(1).toMillis(), //1s
+                (double) Duration.ofSeconds(3).toMillis(), //3s
+                (double) Duration.ofSeconds(5).toMillis() //5s
+        );
         InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
             .setType(InstrumentType.HISTOGRAM)
             .setName(HISTOGRAM_MESSAGE_SIZE)
@@ -401,6 +414,24 @@ public class BrokerMetricsManager {
         SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder, 
brokerConfig.getMetricsOtelCardinalityLimit());
         providerBuilder.registerView(commitLatencySelector, 
commitLatencyViewBuilder.build());
 
+        InstrumentSelector createTopicTimeSelector = 
InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM)
+                .setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
+                .build();
+        InstrumentSelector createSubGroupTimeSelector = 
InstrumentSelector.builder()
+                .setType(InstrumentType.HISTOGRAM)
+                .setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
+                .build();
+        ViewBuilder createTopicTimeViewBuilder = View.builder()
+                
.setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
+        ViewBuilder createSubGroupTimeViewBuilder = View.builder()
+                
.setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
+        // To config the cardinalityLimit for openTelemetry metrics exporting.
+        SdkMeterProviderUtil.setCardinalityLimit(createTopicTimeViewBuilder, 
brokerConfig.getMetricsOtelCardinalityLimit());
+        providerBuilder.registerView(createTopicTimeSelector, 
createTopicTimeViewBuilder.build());
+        
SdkMeterProviderUtil.setCardinalityLimit(createSubGroupTimeViewBuilder, 
brokerConfig.getMetricsOtelCardinalityLimit());
+        providerBuilder.registerView(createSubGroupTimeSelector, 
createSubGroupTimeViewBuilder.build());
+
         for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : 
RemotingMetricsManager.getMetricsView()) {
             ViewBuilder viewBuilder = selectorViewPair.getObject2();
             SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, 
brokerConfig.getMetricsOtelCardinalityLimit());
@@ -482,6 +513,18 @@ public class BrokerMetricsManager {
             .setDescription("Incoming messages size")
             .ofLongs()
             .build();
+
+        topicCreateExecuteTime = 
brokerMeter.histogramBuilder(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
+                .setDescription("The distribution of create topic time")
+                .ofLongs()
+                .setUnit("milliseconds")
+                .build();
+
+        consumerGroupCreateExecuteTime = 
brokerMeter.histogramBuilder(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
+                .setDescription("The distribution of create subscription time")
+                .ofLongs()
+                .setUnit("milliseconds")
+                .build();
     }
 
     private void initConnectionMetrics() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/InvocationStatus.java 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/InvocationStatus.java
new file mode 100644
index 0000000000..c7501e53d9
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/InvocationStatus.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.metrics;
+
+public enum InvocationStatus {
+    SUCCESS("success"),
+    FAILURE("failure");
+
+    private final String name;
+
+    InvocationStatus(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}
\ No newline at end of file
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a1a6f5bf6c..40a7a461e8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import io.opentelemetry.api.common.Attributes;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.AccessValidator;
@@ -58,6 +59,8 @@ import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+import org.apache.rocketmq.broker.metrics.InvocationStatus;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
@@ -212,7 +215,8 @@ import org.apache.rocketmq.store.queue.ReferredIterator;
 import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.util.LibC;
-
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
+import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
 import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
 public class AdminBrokerProcessor implements NettyRequestProcessor {
@@ -465,45 +469,46 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
         String topic = requestHeader.getTopic();
 
-        TopicValidator.ValidateTopicResult result = 
TopicValidator.validateTopic(topic);
-        if (!result.isValid()) {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(result.getRemark());
-            return response;
-        }
-        if 
(brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
-            if (TopicValidator.isSystemTopic(topic)) {
+        long executionTime;
+        try {
+            TopicValidator.ValidateTopicResult result = 
TopicValidator.validateTopic(topic);
+            if (!result.isValid()) {
                 response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("The topic[" + topic + "] is conflict with 
system topic.");
+                response.setRemark(result.getRemark());
                 return response;
             }
-        }
-
-        TopicConfig topicConfig = new TopicConfig(topic);
-        topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
-        topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
-        topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
-        topicConfig.setPerm(requestHeader.getPerm());
-        topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 
0 : requestHeader.getTopicSysFlag());
-        topicConfig.setOrder(requestHeader.getOrder());
-        String attributesModification = requestHeader.getAttributes();
-        
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
+            if 
(brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
+                if (TopicValidator.isSystemTopic(topic)) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("The topic[" + topic + "] is conflict 
with system topic.");
+                    return response;
+                }
+            }
 
-        if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
-            && !brokerController.getBrokerConfig().isEnableMixedMessageType()) 
{
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("MIXED message type is not supported.");
-            return response;
-        }
+            TopicConfig topicConfig = new TopicConfig(topic);
+            topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
+            topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
+            
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
+            topicConfig.setPerm(requestHeader.getPerm());
+            topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == 
null ? 0 : requestHeader.getTopicSysFlag());
+            topicConfig.setOrder(requestHeader.getOrder());
+            String attributesModification = requestHeader.getAttributes();
+            
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
+
+            if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
+                && 
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("MIXED message type is not supported.");
+                return response;
+            }
 
-        if 
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
 {
-            LOGGER.info("Broker receive request to update or create topic={}, 
but topicConfig has  no changes , so idempotent, caller address={}",
-                requestHeader.getTopic(), 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-            response.setCode(ResponseCode.SUCCESS);
-            return response;
-        }
+            if 
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
 {
+                LOGGER.info("Broker receive request to update or create 
topic={}, but topicConfig has  no changes , so idempotent, caller address={}",
+                    requestHeader.getTopic(), 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                response.setCode(ResponseCode.SUCCESS);
+                return response;
+            }
 
-        try {
             
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
             if 
(brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
                 this.brokerController.registerSingleTopicAll(topicConfig);
@@ -517,7 +522,16 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             response.setRemark(e.getMessage());
             return response;
         }
-        long executionTime = System.currentTimeMillis() - startTime;
+        finally {
+            executionTime = System.currentTimeMillis() - startTime;
+            InvocationStatus status = response.getCode() == 
ResponseCode.SUCCESS ?
+                    InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
+            Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_INVOCATION_STATUS, status.getName())
+                    .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
+                    .build();
+            BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, 
attributes);
+        }
         LOGGER.info("executionTime of create topic:{} is {} ms" , topic, 
executionTime);
         return response;
     }
@@ -1468,6 +1482,12 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         response.setRemark(null);
         long executionTime = System.currentTimeMillis() - startTime;
         LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" 
,config.getGroupName() ,executionTime);
+        InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
+                InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
+        Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+                .put(LABEL_INVOCATION_STATUS, status.getName())
+                .build();
+        
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, 
attributes);
         return response;
     }
 

Reply via email to