This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a9ed984 expose managedLedgerCache, managedLedger, loadBalance metrics
to prometheus (#6705)
a9ed984 is described below
commit a9ed984f4ccac656fe09f65009efa977cc4e033c
Author: hangc0276 <[email protected]>
AuthorDate: Sat Apr 25 14:46:41 2020 +0800
expose managedLedgerCache, managedLedger, loadBalance metrics to prometheus
(#6705)
## Motivation
The managed ledger read cache monitor metric is export via
/admin/broker-stats/metrics with json format, it is hard to parse, collect and
display, what's more the read cache is a very import module for message
consuming throughput and latency. So collect and display the read cache metrics
is extremely urgent for pulsar in production.
## Changes
I parse the json format metric to prometheus message type and export to
prometheus monitor port, so those metrics can be displayed in grafana.
Please help check those changes, if it's ok, i will update the metric
document.
---
.../mledger/impl/ManagedLedgerMBeanImpl.java | 2 +-
.../prometheus/PrometheusMetricsGenerator.java | 74 ++++++++++++++++++++++
.../pulsar/broker/stats/PrometheusMetricsTest.java | 65 +++++++++++++++++++
site2/docs/reference-metrics.md | 49 ++++++++++++++
4 files changed, 189 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 61055b5..47062e9 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -30,7 +30,7 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
public static final long[] ENTRY_LATENCY_BUCKETS_USEC = { 500, 1_000,
5_000, 10_000, 20_000, 50_000, 100_000,
200_000, 1000_000 };
- public static final long[] ENTRY_SIZE_BUCKETS_BYTES = { 128, 512, 1024,
2084, 4096, 16_384, 102_400, 1_232_896 };
+ public static final long[] ENTRY_SIZE_BUCKETS_BYTES = { 128, 512, 1024,
2048, 4096, 16_384, 102_400, 1_232_896 };
private final ManagedLedgerImpl managedLedger;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index b225958..5796c8e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -20,10 +20,18 @@ package org.apache.pulsar.broker.stats.prometheus;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.Enumeration;
import org.apache.pulsar.broker.PulsarService;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
+
+import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
+import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
+import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import io.netty.buffer.ByteBuf;
@@ -75,12 +83,78 @@ public class PrometheusMetricsGenerator {
FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
pulsar.getConfiguration().getClusterName(), stream);
+ generateBrokerBasicMetrics(pulsar, stream);
+
out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
buf.release();
}
}
+ private static void generateBrokerBasicMetrics(PulsarService pulsar,
SimpleTextOutputStream stream) {
+ String clusterName = pulsar.getConfiguration().getClusterName();
+ // generate managedLedgerCache metrics
+ parseMetricsToPrometheusMetrics(new
ManagedLedgerCacheMetrics(pulsar).generate(),
+ clusterName, Collector.Type.GAUGE, stream);
+
+ // generate managedLedger metrics
+ parseMetricsToPrometheusMetrics(new
ManagedLedgerMetrics(pulsar).generate(),
+ clusterName, Collector.Type.GAUGE, stream);
+
+ // generate loadBalance metrics
+
parseMetricsToPrometheusMetrics(pulsar.getLoadManager().get().getLoadBalancingMetrics(),
+ clusterName, Collector.Type.GAUGE, stream);
+ }
+
+ private static void parseMetricsToPrometheusMetrics(Collection<Metrics>
metrics, String cluster,
+ Collector.Type
metricType, SimpleTextOutputStream stream) {
+ Set<String> names = new HashSet<>();
+ for (Metrics metrics1 : metrics) {
+ for (Map.Entry<String, Object> entry :
metrics1.getMetrics().entrySet()) {
+ String value = null;
+ if (entry.getKey().contains(".")) {
+ try {
+ String key = entry.getKey();
+ int dotIndex = key.indexOf(".");
+ int nameIndex = key.substring(0,
dotIndex).lastIndexOf("_");
+ if (nameIndex == -1) {
+ continue;
+ }
+
+ String name = key.substring(0, nameIndex);
+ value = key.substring(nameIndex + 1);
+ if (!names.contains(name)) {
+ stream.write("# TYPE ").write(name.replace("brk_",
"pulsar_")).write(' ')
+ .write(getTypeStr(metricType)).write("\n");
+ names.add(name);
+ }
+ stream.write(name.replace("brk_", "pulsar_"))
+
.write("{cluster=\"").write(cluster).write('"');
+ } catch (Exception e) {
+ continue;
+ }
+ } else {
+ stream.write("# TYPE
").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
+ .write(getTypeStr(metricType)).write('\n');
+ stream.write(entry.getKey().replace("brk_", "pulsar_"))
+ .write("{cluster=\"").write(cluster).write('"');
+ }
+
+ for (Map.Entry<String, String> metric :
metrics1.getDimensions().entrySet()) {
+ if (metric.getKey().isEmpty() ||
"cluster".equals(metric.getKey())) {
+ continue;
+ }
+ stream.write(",
").write(metric.getKey()).write("=\"").write(metric.getValue()).write('"');
+ if (value != null && !value.isEmpty()) {
+ stream.write(",
").write("quantile=\"").write(value).write('"');
+ }
+ }
+ stream.write("} ").write(String.valueOf(entry.getValue()))
+ .write('
').write(System.currentTimeMillis()).write("\n");
+ }
+ }
+ }
+
private static void generateSystemMetrics(SimpleTextOutputStream stream,
String cluster) {
Enumeration<MetricFamilySamples> metricFamilySamples =
CollectorRegistry.defaultRegistry.metricFamilySamples();
while (metricFamilySamples.hasMoreElements()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index e569caf..b3246c1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -243,6 +243,71 @@ public class PrometheusMetricsTest extends BrokerTestBase {
p2.close();
}
+ @Test
+ public void testManagedLedgerCacheStats() throws Exception {
+ Producer<byte[]> p1 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+ Producer<byte[]> p2 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+ String metricsStr = new String(statsOut.toByteArray());
+
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ metrics.entries().forEach(e ->
+ System.out.println(e.getKey() + ": " + e.getValue())
+ );
+
+ List<Metric> cm = (List<Metric>)
metrics.get("pulsar_ml_cache_evictions");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+ cm = (List<Metric>) metrics.get("pulsar_ml_cache_hits_rate");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+ p1.close();
+ p2.close();
+ }
+
+ @Test
+ public void testManagedLedgerStats() throws Exception {
+ Producer<byte[]> p1 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+ Producer<byte[]> p2 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+ String metricsStr = new String(statsOut.toByteArray());
+
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ metrics.entries().forEach(e ->
+ System.out.println(e.getKey() + ": " + e.getValue())
+ );
+
+ List<Metric> cm = (List<Metric>)
metrics.get("pulsar_ml_AddEntryBytesRate");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+ cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryMessagesRate");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("cluster"), "test");
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+ p1.close();
+ p2.close();
+ }
/**
* Hacky parsing of Prometheus text format. Sould be good enough for unit
tests
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 70c3120..11cdc21 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -100,6 +100,8 @@ Broker has the following kinds of metrics:
* [Replication metrics](#replication-metrics)
* [Topic metrics](#topic-metrics)
* [Replication metrics](#replication-metrics-1)
+* [ManagedLedgerCache metrics](#managedledgercache-metrics)
+* [ManagedLedger metrics](#managedledger-metrics)
* [LoadBalancing metrics](#loadbalancing-metrics)
* [BundleUnloading metrics](#bundleunloading-metrics)
* [BundleSplit metrics](#bundlesplit-metrics)
@@ -193,6 +195,53 @@ All the replication metrics will also be labelled with
`remoteCluster=${pulsar_r
| pulsar_replication_throughput_out | Gauge | The total throughput of the
topic replicating to remote cluster (bytes/second). |
| pulsar_replication_backlog | Gauge | The total backlog of the topic
replicating to remote cluster (messages). |
+### ManagedLedgerCache metrics
+All the ManagedLedgerCache metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name
that you configured in broker.conf.
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_ml_cache_evictions | Gauge | The number of cache evictions during the
last minute. |
+| pulsar_ml_cache_hits_rate | Gauge | The number of cache hits per second. |
+| pulsar_ml_cache_hits_throughput | Gauge | The amount of data is retrieved
from the cache in byte/s |
+| pulsar_ml_cache_misses_rate | Gauge | The number of cache misses per second |
+| pulsar_ml_cache_misses_throughput | Gauge | The amount of data is retrieved
from the cache in byte/s |
+| pulsar_ml_cache_pool_active_allocations | Gauge | The number of currently
active allocations in direct arena |
+| pulsar_ml_cache_pool_active_allocations_huge | Gauge | The number of
currently active huge allocation in direct arena |
+| pulsar_ml_cache_pool_active_allocations_normal | Gauge | The number of
currently active normal allocations in direct arena |
+| pulsar_ml_cache_pool_active_allocations_small | Gauge | The number of
currently active small allocations in direct arena |
+| pulsar_ml_cache_pool_active_allocations_tiny | Gauge | The number of
currently active tiny allocations in direct arena |
+| pulsar_ml_cache_pool_allocated | Gauge | The total allocated memory of chunk
lists in direct arena |
+| pulsar_ml_cache_pool_used | Gauge | The total used memory of chunk lists in
direct arena |
+| pulsar_ml_cache_used_size | Gauge | The size in byte used to store the
entries payloads |
+| pulsar_ml_count | Gauge | The number of currently opened managed ledgers |
+
+### ManagedLedger metrics
+All the managedLedger metrics are labelled with the following labels:
+- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name
that you configured in broker.conf.
+- namespace: namespace=${pulsar_namespace}. ${pulsar_namespace} is the
namespace name.
+- quantile: quantile=${quantile}. Quantile is only for `Histogram` type
metric, and represents the threshold for given Buckets.
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_ml_AddEntryBytesRate | Gauge | The bytes/s rate of messages added |
+| pulsar_ml_AddEntryErrors | Gauge | The number of addEntry requests that
failed |
+| pulsar_ml_AddEntryLatencyBuckets | Histogram | The add entry latency of a
ledger with a given quantile (threshold).<br> Available quantile: <br><ul><li>
quantile="0.0_0.5" is AddEntryLatency between (0.0ms, 0.5ms]</li>
<li>quantile="0.5_1.0" is AddEntryLatency between (0.5ms,
1.0ms]</li><li>quantile="1.0_5.0" is AddEntryLatency between (1ms,
5ms]</li><li>quantile="5.0_10.0" is AddEntryLatency between (5ms,
10ms]</li><li>quantile="10.0_20.0" is AddEntryLatency between (10ms,
20ms]</li>< [...]
+| pulsar_ml_AddEntryLatencyBuckets_OVERFLOW | Gauge | The add entry latency >
1s |
+| pulsar_ml_AddEntryMessagesRate | Gauge | The msg/s rate of messages added |
+| pulsar_ml_AddEntrySucceed | Gauge | The number of addEntry requests that
succeeded |
+| pulsar_ml_EntrySizeBuckets | Histogram | The add entry size of a ledger with
given quantile.<br> Available quantile: <br><ul><li>quantile="0.0_128.0" is
EntrySize between (0byte, 128byte]</li><li>quantile="128.0_512.0" is EntrySize
between (128byte, 512byte]</li><li>quantile="512.0_1024.0" is EntrySize between
(512byte, 1KB]</li><li>quantile="1024.0_2048.0" is EntrySize between (1KB,
2KB]</li><li>quantile="2048.0_4096.0" is EntrySize between (2KB,
4KB]</li><li>quantile="4096.0_16384.0" [...]
+| pulsar_ml_EntrySizeBuckets_OVERFLOW |Gauge | The add entry size > 1MB |
+| pulsar_ml_LedgerSwitchLatencyBuckets | Histogram | The ledger switch latency
with given quantile. <br> Available quantile: <br><ul><li>quantile="0.0_0.5" is
EntrySize between (0ms, 0.5ms]</li><li>quantile="0.5_1.0" is EntrySize between
(0.5ms, 1ms]</li><li>quantile="1.0_5.0" is EntrySize between (1ms,
5ms]</li><li>quantile="5.0_10.0" is EntrySize between (5ms,
10ms]</li><li>quantile="10.0_20.0" is EntrySize between (10ms,
20ms]</li><li>quantile="20.0_50.0" is EntrySize between (20ms, 5 [...]
+| pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW | Gauge | The ledger switch
latency > 1s |
+| pulsar_ml_MarkDeleteRate | Gauge | The rate of mark-delete ops/s |
+| pulsar_ml_NumberOfMessagesInBacklog | Gauge | The number of backlog messages
for all the consumers |
+| pulsar_ml_ReadEntriesBytesRate | Gauge | The bytes/s rate of messages read |
+| pulsar_ml_ReadEntriesErrors | Gauge | The number of readEntries requests
that failed |
+| pulsar_ml_ReadEntriesRate | Gauge | The msg/s rate of messages read |
+| pulsar_ml_ReadEntriesSucceeded | Gauge | The number of readEntries requests
that succeeded |
+| pulsar_ml_StoredMessagesSize | Gauge | The total size of the messages in
active ledgers (accounting for the multiple copies stored) |
+
### LoadBalancing metrics
All the loadbalancing metrics are labelled with the following labels:
- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name
that you configured in broker.conf.