This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 80c5791b874 [improve][broker]PIP-214 Add broker level metrics 
statistics and expose to prometheus (#19047)
80c5791b874 is described below

commit 80c5791b87482bee3392308ecef45f455f8de885
Author: yangyijun <[email protected]>
AuthorDate: Fri Mar 17 01:34:54 2023 +0800

    [improve][broker]PIP-214 Add broker level metrics statistics and expose to 
prometheus (#19047)
---
 .../stats/prometheus/AggregatedBrokerStats.java    |  67 +++++++++++++
 .../stats/prometheus/NamespaceStatsAggregator.java |  48 ++++++----
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 105 +++++++++++++++++++--
 .../broker/stats/TransactionMetricsTest.java       |   4 +-
 4 files changed, 197 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
new file mode 100644
index 00000000000..00c6cecdbfc
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+public class AggregatedBrokerStats {
+    public int topicsCount;
+    public int subscriptionsCount;
+    public int producersCount;
+    public int consumersCount;
+    public double rateIn;
+    public double rateOut;
+    public double throughputIn;
+    public double throughputOut;
+    public long storageSize;
+    public long storageLogicalSize;
+    public double storageWriteRate;
+    public double storageReadRate;
+    public long msgBacklog;
+
+    void updateStats(TopicStats stats) {
+        topicsCount++;
+        subscriptionsCount += stats.subscriptionsCount;
+        producersCount += stats.producersCount;
+        consumersCount += stats.consumersCount;
+        rateIn += stats.rateIn;
+        rateOut += stats.rateOut;
+        throughputIn += stats.throughputIn;
+        throughputOut += stats.throughputOut;
+        storageSize += stats.managedLedgerStats.storageSize;
+        storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
+        storageWriteRate += stats.managedLedgerStats.storageWriteRate;
+        storageReadRate += stats.managedLedgerStats.storageReadRate;
+        msgBacklog += stats.msgBacklog;
+    }
+
+    public void reset() {
+        topicsCount = 0;
+        subscriptionsCount = 0;
+        producersCount = 0;
+        consumersCount = 0;
+        rateIn = 0;
+        rateOut = 0;
+        throughputIn = 0;
+        throughputOut = 0;
+        storageSize = 0;
+        storageLogicalSize = 0;
+        storageWriteRate = 0;
+        storageReadRate = 0;
+        msgBacklog = 0;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index d287bf89d6c..918aef539cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -45,6 +45,14 @@ import org.apache.pulsar.compaction.CompactorMXBean;
 @Slf4j
 public class NamespaceStatsAggregator {
 
+    private static final FastThreadLocal<AggregatedBrokerStats> 
localBrokerStats =
+            new FastThreadLocal<>() {
+                @Override
+                protected AggregatedBrokerStats initialValue() {
+                    return new AggregatedBrokerStats();
+                }
+            };
+
     private static final FastThreadLocal<AggregatedNamespaceStats> 
localNamespaceStats =
             new FastThreadLocal<>() {
                 @Override
@@ -64,14 +72,13 @@ public class NamespaceStatsAggregator {
                                 boolean includeProducerMetrics, boolean 
splitTopicAndPartitionIndexLabel,
                                 PrometheusMetricStreams stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
+        AggregatedBrokerStats brokerStats = localBrokerStats.get();
+        brokerStats.reset();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
         TopicStats topicStats = localTopicStats.get();
         Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
         LongAdder topicsCount = new LongAdder();
         Map<String, Long> localNamespaceTopicCount = new HashMap<>();
-
-        printDefaultBrokerStats(stream, cluster);
-
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, 
bundlesMap) -> {
             namespaceStats.reset();
             topicsCount.reset();
@@ -83,6 +90,8 @@ public class NamespaceStatsAggregator {
                         compactorMXBean
                 );
 
+                brokerStats.updateStats(topicStats);
+
                 if (includeTopicMetrics) {
                     topicsCount.add(1);
                     TopicStats.printTopicStats(stream, topicStats, 
compactorMXBean, cluster, namespace, name,
@@ -104,6 +113,8 @@ public class NamespaceStatsAggregator {
         if (includeTopicMetrics) {
             printTopicsCountStats(stream, localNamespaceTopicCount, cluster);
         }
+
+        printBrokerStats(stream, cluster, brokerStats);
     }
 
     private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService 
pulsar) {
@@ -301,22 +312,23 @@ public class NamespaceStatsAggregator {
                 });
     }
 
-    private static void printDefaultBrokerStats(PrometheusMetricStreams 
stream, String cluster) {
-        // Print metrics with 0 values. This is necessary to have the 
available brokers being
+    private static void printBrokerStats(PrometheusMetricStreams stream, 
String cluster,
+                                         AggregatedBrokerStats brokerStats) {
+        // Print metrics values. This is necessary to have the available 
brokers being
         // reported in the brokers dashboard even if they don't have any topic 
or traffic
-        writeMetric(stream, "pulsar_topics_count", 0, cluster);
-        writeMetric(stream, "pulsar_subscriptions_count", 0, cluster);
-        writeMetric(stream, "pulsar_producers_count", 0, cluster);
-        writeMetric(stream, "pulsar_consumers_count", 0, cluster);
-        writeMetric(stream, "pulsar_rate_in", 0, cluster);
-        writeMetric(stream, "pulsar_rate_out", 0, cluster);
-        writeMetric(stream, "pulsar_throughput_in", 0, cluster);
-        writeMetric(stream, "pulsar_throughput_out", 0, cluster);
-        writeMetric(stream, "pulsar_storage_size", 0, cluster);
-        writeMetric(stream, "pulsar_storage_logical_size", 0, cluster);
-        writeMetric(stream, "pulsar_storage_write_rate", 0, cluster);
-        writeMetric(stream, "pulsar_storage_read_rate", 0, cluster);
-        writeMetric(stream, "pulsar_msg_backlog", 0, cluster);
+        writeMetric(stream, "pulsar_broker_topics_count", 
brokerStats.topicsCount, cluster);
+        writeMetric(stream, "pulsar_broker_subscriptions_count", 
brokerStats.subscriptionsCount, cluster);
+        writeMetric(stream, "pulsar_broker_producers_count", 
brokerStats.producersCount, cluster);
+        writeMetric(stream, "pulsar_broker_consumers_count", 
brokerStats.consumersCount, cluster);
+        writeMetric(stream, "pulsar_broker_rate_in", brokerStats.rateIn, 
cluster);
+        writeMetric(stream, "pulsar_broker_rate_out", brokerStats.rateOut, 
cluster);
+        writeMetric(stream, "pulsar_broker_throughput_in", 
brokerStats.throughputIn, cluster);
+        writeMetric(stream, "pulsar_broker_throughput_out", 
brokerStats.throughputOut, cluster);
+        writeMetric(stream, "pulsar_broker_storage_size", 
brokerStats.storageSize, cluster);
+        writeMetric(stream, "pulsar_broker_storage_logical_size", 
brokerStats.storageLogicalSize, cluster);
+        writeMetric(stream, "pulsar_broker_storage_write_rate", 
brokerStats.storageWriteRate, cluster);
+        writeMetric(stream, "pulsar_broker_storage_read_rate", 
brokerStats.storageReadRate, cluster);
+        writeMetric(stream, "pulsar_broker_msg_backlog", 
brokerStats.msgBacklog, cluster);
     }
 
     private static void printTopicsCountStats(PrometheusMetricStreams stream, 
Map<String, Long> namespaceTopicsCount,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 7eb6afb97cd..13e67762ace 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -319,11 +319,11 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("pulsar_producers_count");
-        assertEquals(cm.size(), 3);
-        assertEquals(cm.get(1).tags.get("topic"), 
"persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(1).tags.get("topic"), 
"persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("topic"), 
"persistent://my-property/use/my-ns/my-topic1");
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
-        assertEquals(cm.get(2).tags.get("topic"), 
"persistent://my-property/use/my-ns/my-topic1");
-        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("topic_load_times_count");
         assertEquals(cm.size(), 1);
@@ -367,6 +367,97 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         c2.close();
     }
 
+    @Test
+    public void testPerBrokerStats() throws Exception {
+        Producer<byte[]> p1 = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        Collection<Metric> brokerMetrics = 
metrics.get("pulsar_broker_topics_count");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_subscriptions_count");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_producers_count");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_consumers_count");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_rate_in");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_rate_out");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_throughput_in");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_throughput_out");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_storage_size");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_storage_logical_size");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_storage_write_rate");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_storage_read_rate");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        brokerMetrics = metrics.get("pulsar_broker_msg_backlog");
+        assertEquals(brokerMetrics.size(), 1);
+        
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), 
"test");
+
+        p1.close();
+        p2.close();
+        c1.close();
+        c2.close();
+    }
+
     /**
      * Test that the total message and byte counts for a topic are not reset 
when a consumer disconnects.
      *
@@ -674,9 +765,9 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("pulsar_producers_count");
-        assertEquals(cm.size(), 2);
-        assertNull(cm.get(1).tags.get("topic"));
-        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.size(), 1);
+        assertNull(cm.get(0).tags.get("topic"));
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
 
         cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
         assertEquals(cm.size(), 1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 30aedc02253..4d38f5fad51 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -293,9 +293,9 @@ public class TransactionMetricsTest extends BrokerTestBase {
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
         metric = metrics.get("pulsar_storage_size");
-        assertEquals(metric.size(), 3);
+        assertEquals(metric.size(), 2);
         metric = metrics.get("pulsar_storage_logical_size");
-        assertEquals(metric.size(), 3);
+        assertEquals(metric.size(), 2);
         metric = metrics.get("pulsar_storage_backlog_size");
         assertEquals(metric.size(), 2);
     }

Reply via email to