This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 80c5791b874 [improve][broker]PIP-214 Add broker level metrics
statistics and expose to prometheus (#19047)
80c5791b874 is described below
commit 80c5791b87482bee3392308ecef45f455f8de885
Author: yangyijun <[email protected]>
AuthorDate: Fri Mar 17 01:34:54 2023 +0800
[improve][broker]PIP-214 Add broker level metrics statistics and expose to
prometheus (#19047)
---
.../stats/prometheus/AggregatedBrokerStats.java | 67 +++++++++++++
.../stats/prometheus/NamespaceStatsAggregator.java | 48 ++++++----
.../pulsar/broker/stats/PrometheusMetricsTest.java | 105 +++++++++++++++++++--
.../broker/stats/TransactionMetricsTest.java | 4 +-
4 files changed, 197 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
new file mode 100644
index 00000000000..00c6cecdbfc
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+public class AggregatedBrokerStats {
+ public int topicsCount;
+ public int subscriptionsCount;
+ public int producersCount;
+ public int consumersCount;
+ public double rateIn;
+ public double rateOut;
+ public double throughputIn;
+ public double throughputOut;
+ public long storageSize;
+ public long storageLogicalSize;
+ public double storageWriteRate;
+ public double storageReadRate;
+ public long msgBacklog;
+
+ void updateStats(TopicStats stats) {
+ topicsCount++;
+ subscriptionsCount += stats.subscriptionsCount;
+ producersCount += stats.producersCount;
+ consumersCount += stats.consumersCount;
+ rateIn += stats.rateIn;
+ rateOut += stats.rateOut;
+ throughputIn += stats.throughputIn;
+ throughputOut += stats.throughputOut;
+ storageSize += stats.managedLedgerStats.storageSize;
+ storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
+ storageWriteRate += stats.managedLedgerStats.storageWriteRate;
+ storageReadRate += stats.managedLedgerStats.storageReadRate;
+ msgBacklog += stats.msgBacklog;
+ }
+
+ public void reset() {
+ topicsCount = 0;
+ subscriptionsCount = 0;
+ producersCount = 0;
+ consumersCount = 0;
+ rateIn = 0;
+ rateOut = 0;
+ throughputIn = 0;
+ throughputOut = 0;
+ storageSize = 0;
+ storageLogicalSize = 0;
+ storageWriteRate = 0;
+ storageReadRate = 0;
+ msgBacklog = 0;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index d287bf89d6c..918aef539cf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -45,6 +45,14 @@ import org.apache.pulsar.compaction.CompactorMXBean;
@Slf4j
public class NamespaceStatsAggregator {
+ private static final FastThreadLocal<AggregatedBrokerStats>
localBrokerStats =
+ new FastThreadLocal<>() {
+ @Override
+ protected AggregatedBrokerStats initialValue() {
+ return new AggregatedBrokerStats();
+ }
+ };
+
private static final FastThreadLocal<AggregatedNamespaceStats>
localNamespaceStats =
new FastThreadLocal<>() {
@Override
@@ -64,14 +72,13 @@ public class NamespaceStatsAggregator {
boolean includeProducerMetrics, boolean
splitTopicAndPartitionIndexLabel,
PrometheusMetricStreams stream) {
String cluster = pulsar.getConfiguration().getClusterName();
+ AggregatedBrokerStats brokerStats = localBrokerStats.get();
+ brokerStats.reset();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats topicStats = localTopicStats.get();
Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
LongAdder topicsCount = new LongAdder();
Map<String, Long> localNamespaceTopicCount = new HashMap<>();
-
- printDefaultBrokerStats(stream, cluster);
-
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace,
bundlesMap) -> {
namespaceStats.reset();
topicsCount.reset();
@@ -83,6 +90,8 @@ public class NamespaceStatsAggregator {
compactorMXBean
);
+ brokerStats.updateStats(topicStats);
+
if (includeTopicMetrics) {
topicsCount.add(1);
TopicStats.printTopicStats(stream, topicStats,
compactorMXBean, cluster, namespace, name,
@@ -104,6 +113,8 @@ public class NamespaceStatsAggregator {
if (includeTopicMetrics) {
printTopicsCountStats(stream, localNamespaceTopicCount, cluster);
}
+
+ printBrokerStats(stream, cluster, brokerStats);
}
private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService
pulsar) {
@@ -301,22 +312,23 @@ public class NamespaceStatsAggregator {
});
}
- private static void printDefaultBrokerStats(PrometheusMetricStreams
stream, String cluster) {
- // Print metrics with 0 values. This is necessary to have the
available brokers being
+ private static void printBrokerStats(PrometheusMetricStreams stream,
String cluster,
+ AggregatedBrokerStats brokerStats) {
+ // Print metrics values. This is necessary to have the available
brokers being
// reported in the brokers dashboard even if they don't have any topic
or traffic
- writeMetric(stream, "pulsar_topics_count", 0, cluster);
- writeMetric(stream, "pulsar_subscriptions_count", 0, cluster);
- writeMetric(stream, "pulsar_producers_count", 0, cluster);
- writeMetric(stream, "pulsar_consumers_count", 0, cluster);
- writeMetric(stream, "pulsar_rate_in", 0, cluster);
- writeMetric(stream, "pulsar_rate_out", 0, cluster);
- writeMetric(stream, "pulsar_throughput_in", 0, cluster);
- writeMetric(stream, "pulsar_throughput_out", 0, cluster);
- writeMetric(stream, "pulsar_storage_size", 0, cluster);
- writeMetric(stream, "pulsar_storage_logical_size", 0, cluster);
- writeMetric(stream, "pulsar_storage_write_rate", 0, cluster);
- writeMetric(stream, "pulsar_storage_read_rate", 0, cluster);
- writeMetric(stream, "pulsar_msg_backlog", 0, cluster);
+ writeMetric(stream, "pulsar_broker_topics_count",
brokerStats.topicsCount, cluster);
+ writeMetric(stream, "pulsar_broker_subscriptions_count",
brokerStats.subscriptionsCount, cluster);
+ writeMetric(stream, "pulsar_broker_producers_count",
brokerStats.producersCount, cluster);
+ writeMetric(stream, "pulsar_broker_consumers_count",
brokerStats.consumersCount, cluster);
+ writeMetric(stream, "pulsar_broker_rate_in", brokerStats.rateIn,
cluster);
+ writeMetric(stream, "pulsar_broker_rate_out", brokerStats.rateOut,
cluster);
+ writeMetric(stream, "pulsar_broker_throughput_in",
brokerStats.throughputIn, cluster);
+ writeMetric(stream, "pulsar_broker_throughput_out",
brokerStats.throughputOut, cluster);
+ writeMetric(stream, "pulsar_broker_storage_size",
brokerStats.storageSize, cluster);
+ writeMetric(stream, "pulsar_broker_storage_logical_size",
brokerStats.storageLogicalSize, cluster);
+ writeMetric(stream, "pulsar_broker_storage_write_rate",
brokerStats.storageWriteRate, cluster);
+ writeMetric(stream, "pulsar_broker_storage_read_rate",
brokerStats.storageReadRate, cluster);
+ writeMetric(stream, "pulsar_broker_msg_backlog",
brokerStats.msgBacklog, cluster);
}
private static void printTopicsCountStats(PrometheusMetricStreams stream,
Map<String, Long> namespaceTopicsCount,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 7eb6afb97cd..13e67762ace 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -319,11 +319,11 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_producers_count");
- assertEquals(cm.size(), 3);
- assertEquals(cm.get(1).tags.get("topic"),
"persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(1).tags.get("topic"),
"persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(1).tags.get("topic"),
"persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
- assertEquals(cm.get(2).tags.get("topic"),
"persistent://my-property/use/my-ns/my-topic1");
- assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("topic_load_times_count");
assertEquals(cm.size(), 1);
@@ -367,6 +367,97 @@ public class PrometheusMetricsTest extends BrokerTestBase {
c2.close();
}
+ @Test
+ public void testPerBrokerStats() throws Exception {
+ Producer<byte[]> p1 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+ Producer<byte[]> p2 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+
+ Consumer<byte[]> c1 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("test")
+ .subscribe();
+
+ Consumer<byte[]> c2 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic2")
+ .subscriptionName("test")
+ .subscribe();
+
+ final int messages = 10;
+
+ for (int i = 0; i < messages; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ for (int i = 0; i < messages; i++) {
+ c1.acknowledge(c1.receive());
+ c2.acknowledge(c2.receive());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, false,
statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ Collection<Metric> brokerMetrics =
metrics.get("pulsar_broker_topics_count");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_subscriptions_count");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_producers_count");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_consumers_count");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_rate_in");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_rate_out");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_throughput_in");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_throughput_out");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_storage_size");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_storage_logical_size");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_storage_write_rate");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_storage_read_rate");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ brokerMetrics = metrics.get("pulsar_broker_msg_backlog");
+ assertEquals(brokerMetrics.size(), 1);
+
assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"),
"test");
+
+ p1.close();
+ p2.close();
+ c1.close();
+ c2.close();
+ }
+
/**
* Test that the total message and byte counts for a topic are not reset
when a consumer disconnects.
*
@@ -674,9 +765,9 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_producers_count");
- assertEquals(cm.size(), 2);
- assertNull(cm.get(1).tags.get("topic"));
- assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.size(), 1);
+ assertNull(cm.get(0).tags.get("topic"));
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 30aedc02253..4d38f5fad51 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -293,9 +293,9 @@ public class TransactionMetricsTest extends BrokerTestBase {
metricsStr = statsOut.toString();
metrics = parseMetrics(metricsStr);
metric = metrics.get("pulsar_storage_size");
- assertEquals(metric.size(), 3);
+ assertEquals(metric.size(), 2);
metric = metrics.get("pulsar_storage_logical_size");
- assertEquals(metric.size(), 3);
+ assertEquals(metric.size(), 2);
metric = metrics.get("pulsar_storage_backlog_size");
assertEquals(metric.size(), 2);
}