This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d91dfb Add some metrics to prometheus. (#2299)
8d91dfb is described below
commit 8d91dfb449ec1be54a7f2fad2e02ea2212bdb1a4
Author: penghui <[email protected]>
AuthorDate: Fri Aug 3 18:04:12 2018 +0800
Add some metrics to prometheus. (#2299)
* Add some metrics to prometheus.
1.pulsar_subscription_back_log
2.pulsar_subscription_msg_rate_redeliver
3.pulsar_subscription_unacked_massages
4.pulsar_subscription_blocked_on_unacked_messages
5.pulsar_consumer_msg_rate_redeliver
6.pulsar_consumer_unacked_massages
7.pulsar_consumer_blocked_on_unacked_messages
* Add some metrics to prometheus.
1.pulsar_subscription_back_log
2.pulsar_subscription_msg_rate_redeliver
3.pulsar_subscription_unacked_massages
4.pulsar_subscription_blocked_on_unacked_messages
5.pulsar_consumer_msg_rate_redeliver
6.pulsar_consumer_unacked_massages
7.pulsar_consumer_blocked_on_unacked_messages
---
conf/broker.conf | 3 +
docker/grafana/dashboards/topic.json | 1160 ++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 9 +
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../stats/prometheus/AggregatedConsumerStats.java | 34 +
.../stats/prometheus/AggregatedNamespaceStats.java | 20 +
.../prometheus/AggregatedSubscriptionStats.java | 41 +
.../stats/prometheus/NamespaceStatsAggregator.java | 41 +-
.../prometheus/PrometheusMetricsGenerator.java | 4 +-
.../stats/prometheus/PrometheusMetricsServlet.java | 6 +-
.../pulsar/broker/stats/prometheus/TopicStats.java | 53 +-
.../pulsar/broker/stats/PrometheusMetricsTest.java | 4 +-
12 files changed, 1360 insertions(+), 17 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index a639b85..5f4a4e9 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -479,6 +479,9 @@ webSocketSessionIdleTimeoutMillis=300000
# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true
+# Enable consumer level metrics. default is false
+# exposeConsumerLevelMetricsInPrometheus=false
+
### --- Functions --- ###
# Enable Functions Worker Service in Broker
diff --git a/docker/grafana/dashboards/topic.json
b/docker/grafana/dashboards/topic.json
new file mode 100644
index 0000000..d53a018
--- /dev/null
+++ b/docker/grafana/dashboards/topic.json
@@ -0,0 +1,1160 @@
+{
+ "__inputs": [
+ {
+ "name": "DS_default",
+ "label": "default",
+ "description": "",
+ "type": "datasource",
+ "pluginId": "prometheus",
+ "pluginName": "Prometheus"
+ }
+ ],
+ "__requires": [
+ {
+ "type": "grafana",
+ "id": "grafana",
+ "name": "Grafana",
+ "version": "5.1.0"
+ },
+ {
+ "type": "panel",
+ "id": "graph",
+ "name": "Graph",
+ "version": "5.0.0"
+ },
+ {
+ "type": "datasource",
+ "id": "prometheus",
+ "name": "Prometheus",
+ "version": "5.0.0"
+ }
+ ],
+ "annotations": {
+ "list": [
+ {
+ "builtIn": 1,
+ "datasource": "-- Grafana --",
+ "enable": true,
+ "hide": true,
+ "iconColor": "rgba(0, 211, 255, 1)",
+ "name": "Annotations & Alerts",
+ "type": "dashboard"
+ }
+ ]
+ },
+ "editable": true,
+ "gnetId": null,
+ "graphTooltip": 0,
+ "id": null,
+ "iteration": 1533285490117,
+ "links": [],
+ "panels": [
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 0,
+ "y": 0
+ },
+ "id": 16,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "hideEmpty": false,
+ "max": false,
+ "min": false,
+ "show": false,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "pulsar_rate_in{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}",
+ "format": "time_series",
+ "hide": false,
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "{{cluster}} - {{namespace}}",
+ "metric": "pulsar_rate_in",
+ "refId": "A",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Local publish rate",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "msg/s",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 12,
+ "y": 0
+ },
+ "id": 2,
+ "legend": {
+ "alignAsTable": false,
+ "avg": false,
+ "current": true,
+ "max": false,
+ "min": false,
+ "rightSide": false,
+ "show": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "pulsar_subscription_msg_rate_out{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}",
+ "format": "time_series",
+ "intervalFactor": 2,
+ "legendFormat": "{{subscription}}",
+ "metric": "pulsar_rate_out",
+ "refId": "A",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Local delivery rate",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "msg / s",
+ "logBase": 1,
+ "max": null,
+ "min": "0",
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 0,
+ "y": 7
+ },
+ "id": 5,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": false,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "pulsar_throughput_in{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "{{cluster}} - {{namespace}}",
+ "metric": "pulsar_throughput_in",
+ "refId": "A",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Local publish throughput (bytes/s)",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "Bps",
+ "label": "",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "description": "",
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 12,
+ "y": 7
+ },
+ "id": 8,
+ "legend": {
+ "alignAsTable": false,
+ "avg": false,
+ "current": true,
+ "max": false,
+ "min": false,
+ "rightSide": false,
+ "show": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr":
"pulsar_subscription_msg_throughput_out{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "{{subscription}}",
+ "metric": "pulsar_throughput_out",
+ "refId": "A",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Local delivery throughput (bytes/s)",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "Bps",
+ "label": "",
+ "logBase": 1,
+ "max": null,
+ "min": "0",
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "decimals": 0,
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 0,
+ "y": 14
+ },
+ "id": 7,
+ "legend": {
+ "alignAsTable": false,
+ "avg": false,
+ "current": true,
+ "max": false,
+ "min": false,
+ "rightSide": false,
+ "show": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(pulsar_producers_count{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"})",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "producers",
+ "metric": "pulsar_producers_count",
+ "refId": "A",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_subscriptions_count{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"})",
+ "format": "time_series",
+ "intervalFactor": 2,
+ "legendFormat": "subscriptions",
+ "metric": "pulsar_subscriptions_count",
+ "refId": "B",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_consumers_count{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"})",
+ "format": "time_series",
+ "intervalFactor": 2,
+ "legendFormat": "consumers",
+ "metric": "pulsar_consumers_count",
+ "refId": "C",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Topics - Producers - Subscriptions - Consumers",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": [
+ "current"
+ ]
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "count",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 12,
+ "y": 14
+ },
+ "id": 4,
+ "legend": {
+ "alignAsTable": false,
+ "avg": false,
+ "current": true,
+ "max": false,
+ "min": false,
+ "rightSide": false,
+ "show": true,
+ "total": false,
+ "values": true
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "pulsar_subscription_back_log{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "{{subscription}}",
+ "metric": "pulsar_msg_backlog",
+ "refId": "A",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Local backlog",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "Messages",
+ "logBase": 1,
+ "max": null,
+ "min": "0",
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {
+ "0 - 0.5 ms": "#2F575E",
+ "0.5 - 1 ms": "#3F6833",
+ "1 - 5 ms": "#629E51",
+ "10 - 20 ms": "#E5A8E2",
+ "100 - 200 ms": "#EF843C",
+ "20 - 50 ms": "#65C5DB",
+ "200 ms - 1 s": "#EA6460",
+ "5 - 10 ms": "#1F78C1",
+ "50 - 100 ms": "#E5AC0E",
+ "< +Inf ms": "#BF1B00",
+ "< 0.5 ms": "#508642",
+ "> 1 s": "#BF1B00"
+ },
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "fill": 5,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 0,
+ "y": 21
+ },
+ "id": 3,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 0,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [
+ {
+ "alias": "< 100 ms",
+ "yaxis": 1
+ }
+ ],
+ "spaceLength": 10,
+ "stack": true,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_0_5{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "0 - 0.5 ms",
+ "metric": "pulsar_add_entry_latency_le_0_5",
+ "refId": "A",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_1{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "0.5 - 1 ms",
+ "metric": "pulsar_add_entry_latency_le_1",
+ "refId": "B",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_5{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "1 - 5 ms",
+ "metric": "pulsar_add_entry_latency_le_5",
+ "refId": "C",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_10{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "5 - 10 ms",
+ "metric": "pulsar_add_entry_latency_le_10",
+ "refId": "D",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_20{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "10 - 20 ms",
+ "metric": "pulsar_add_entry_latency_le_20",
+ "refId": "E",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_50{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "20 - 50 ms",
+ "metric": "pulsar_add_entry_latency_le_50",
+ "refId": "F",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_100{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "hide": false,
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "50 - 100 ms",
+ "metric": "pulsar_add_entry_latency_le_100",
+ "refId": "G",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_200{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "intervalFactor": 2,
+ "legendFormat": "100 - 200 ms",
+ "metric": "pulsar_add_entry_latency_le_200",
+ "refId": "H",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_le_1000{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "intervalFactor": 2,
+ "legendFormat": "200 ms - 1 s",
+ "metric": "pulsar_add_entry_latency_le_1000",
+ "refId": "I",
+ "step": 10
+ },
+ {
+ "expr":
"sum(pulsar_storage_write_latency_overflow{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "> 1 s",
+ "metric": "pulsar_add_entry_latency_overflow",
+ "refId": "J",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Storage Write Latency",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "msg / s",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "fill": 1,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 12,
+ "y": 21
+ },
+ "id": 9,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": false,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 1,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(pulsar_storage_size{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"})",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "$namespace",
+ "metric": "pulsar_storage_size",
+ "refId": "A",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Storage Size",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "decbytes",
+ "label": "",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "${DS_default}",
+ "fill": 6,
+ "gridPos": {
+ "h": 7,
+ "w": 12,
+ "x": 0,
+ "y": 28
+ },
+ "id": 12,
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": true,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 0,
+ "links": [],
+ "nullPointMode": "null",
+ "percentage": false,
+ "pointradius": 5,
+ "points": false,
+ "renderer": "flot",
+ "seriesOverrides": [
+ {
+ "alias": "< 2 KB",
+ "yaxis": 1
+ }
+ ],
+ "spaceLength": 10,
+ "stack": true,
+ "steppedLine": false,
+ "targets": [
+ {
+ "expr": "sum(pulsar_entry_size_le_128{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "< 128 bytes",
+ "metric": "pulsar_entry_size_le_128",
+ "refId": "A",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_entry_size_le_512{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "< 512 bytes",
+ "metric": "pulsar_entry_size_le_512",
+ "refId": "B",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_entry_size_le_1_kb{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "< 1 KB",
+ "metric": "pulsar_entry_size_le_1_kb",
+ "refId": "C",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_entry_size_le_2_kb{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "< 2 KB",
+ "metric": "pulsar_entry_size_le_2_kb",
+ "refId": "D",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_entry_size_le_4_kb{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "< 4 KB",
+ "metric": "pulsar_entry_size_le_4_kb",
+ "refId": "E",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_entry_size_le_16_kb{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "< 16 KB",
+ "metric": "pulsar_entry_size_le_16_kb",
+ "refId": "F",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_entry_size_le_100_kb{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "< 100 KB",
+ "metric": "pulsar_entry_size_le_100_kb",
+ "refId": "G",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_entry_size_le_1_mb{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "< 1 MB",
+ "metric": "pulsar_entry_size_le_1_mb",
+ "refId": "H",
+ "step": 10
+ },
+ {
+ "expr": "sum(pulsar_entry_size_overflow{cluster=~\"$cluster\",
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+ "format": "time_series",
+ "interval": "",
+ "intervalFactor": 2,
+ "legendFormat": "> 1 MB",
+ "metric": "pulsar_entry_size_le_overflow",
+ "refId": "I",
+ "step": 10
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeShift": null,
+ "title": "Storage entry size",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "format": "short",
+ "label": "msg / s",
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": false
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
+ }
+ ],
+ "refresh": "5s",
+ "schemaVersion": 16,
+ "style": "dark",
+ "tags": [],
+ "templating": {
+ "list": [
+ {
+ "allValue": null,
+ "current": {},
+ "datasource": "${DS_default}",
+ "hide": 0,
+ "includeAll": false,
+ "label": "Cluster",
+ "multi": false,
+ "name": "cluster",
+ "options": [],
+ "query": "{cluster=~\".+\"}",
+ "refresh": 1,
+ "regex": "/.*[^_]cluster=\\\"([^\\\"]+)\\\".*/",
+ "sort": 1,
+ "tagValuesQuery": "",
+ "tags": [],
+ "tagsQuery": "",
+ "type": "query",
+ "useTags": false
+ },
+ {
+ "allValue": null,
+ "current": {},
+ "datasource": "${DS_default}",
+ "hide": 0,
+ "includeAll": true,
+ "label": "Namespace",
+ "multi": false,
+ "name": "namespace",
+ "options": [],
+ "query": "{namespace=~\".+\"}",
+ "refresh": 2,
+ "regex": "/.*namespace=\\\"([^\\\"]+)\\\".*/",
+ "sort": 1,
+ "tagValuesQuery": "",
+ "tags": [],
+ "tagsQuery": "",
+ "type": "query",
+ "useTags": false
+ },
+ {
+ "allValue": null,
+ "current": {},
+ "datasource": "${DS_default}",
+ "hide": 0,
+ "includeAll": true,
+ "label": "Topic",
+ "multi": false,
+ "name": "topic",
+ "options": [],
+ "query": "{topic=~\".+\"}",
+ "refresh": 2,
+ "regex": "/.*topic=\\\"([^\\\"]+)\\\".*/",
+ "sort": 1,
+ "tagValuesQuery": "",
+ "tags": [],
+ "tagsQuery": "",
+ "type": "query",
+ "useTags": false
+ }
+ ]
+ },
+ "time": {
+ "from": "now-5m",
+ "to": "now"
+ },
+ "timepicker": {
+ "refresh_intervals": [
+ "5s",
+ "10s",
+ "30s",
+ "1m",
+ "5m",
+ "15m",
+ "30m",
+ "1h",
+ "2h",
+ "1d"
+ ],
+ "time_options": [
+ "5m",
+ "15m",
+ "1h",
+ "6h",
+ "12h",
+ "24h",
+ "2d",
+ "7d",
+ "30d"
+ ]
+ },
+ "timezone": "browser",
+ "title": "Pulsar - Topic",
+ "uid": "3xEtii5mk",
+ "version": 10
+}
\ No newline at end of file
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 182bc09..7656ecc 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -470,6 +470,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
/**** --- Metrics --- ****/
// If true, export topic level metrics otherwise namespace level
private boolean exposeTopicLevelMetricsInPrometheus = true;
+ private boolean exposeConsumerLevelMetricsInPrometheus = false;
/**** --- Functions --- ****/
private boolean functionsWorkerEnabled = false;
@@ -1615,10 +1616,18 @@ public class ServiceConfiguration implements
PulsarConfiguration {
return exposeTopicLevelMetricsInPrometheus;
}
+ public boolean exposeConsumerLevelMetricsInPrometheus() {
+ return exposeConsumerLevelMetricsInPrometheus;
+ }
+
public void setExposeTopicLevelMetricsInPrometheus(boolean
exposeTopicLevelMetricsInPrometheus) {
this.exposeTopicLevelMetricsInPrometheus =
exposeTopicLevelMetricsInPrometheus;
}
+ public void setExposeConsumerLevelMetricsInPrometheus(boolean
exposeConsumerLevelMetricsInPrometheus) {
+ this.exposeConsumerLevelMetricsInPrometheus =
exposeConsumerLevelMetricsInPrometheus;
+ }
+
public String getSchemaRegistryStorageClassName() {
return schemaRegistryStorageClassName;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 8a811ac..93a045e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -357,7 +357,7 @@ public class PulsarService implements AutoCloseable {
this.webService.addRestResources("/lookup",
"org.apache.pulsar.broker.lookup", true, attributeMap);
this.webService.addServlet("/metrics",
- new ServletHolder(new PrometheusMetricsServlet(this,
config.exposeTopicLevelMetricsInPrometheus())),
+ new ServletHolder(new PrometheusMetricsServlet(this,
config.exposeTopicLevelMetricsInPrometheus(),
config.exposeConsumerLevelMetricsInPrometheus())),
false, attributeMap);
if (config.isWebSocketServiceEnabled()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
new file mode 100644
index 0000000..0fadf3e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+public class AggregatedConsumerStats {
+
+ public boolean blockedSubscriptionOnUnackedMsgs;
+
+ public double msgRateRedeliver;
+
+ public long unackedMessages;
+
+ public double msgRateOut;
+
+ public double msgThroughputOut;
+
+ public long availablePermits;
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 299af88..995effc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -46,6 +46,8 @@ public class AggregatedNamespaceStats {
public Map<String, AggregatedReplicationStats> replicationStats = new
HashMap<>();
+ public Map<String, AggregatedSubscriptionStats> subscriptionStats = new
HashMap<>();
+
void updateStats(TopicStats stats) {
topicsCount++;
@@ -77,6 +79,22 @@ public class AggregatedNamespaceStats {
replStats.msgThroughputOut += as.msgThroughputOut;
replStats.replicationBacklog += as.replicationBacklog;
});
+
+ stats.subscriptionStats.forEach((n, as) -> {
+ AggregatedSubscriptionStats subsStats =
+ subscriptionStats.computeIfAbsent(n, k -> new
AggregatedSubscriptionStats());
+ subsStats.blockedSubscriptionOnUnackedMsgs =
as.blockedSubscriptionOnUnackedMsgs;
+ subsStats.msgBacklog += as.msgBacklog;
+ subsStats.msgRateRedeliver += as.msgRateRedeliver;
+ subsStats.unackedMessages += as.unackedMessages;
+ as.consumerStat.forEach((c, v) -> {
+ AggregatedConsumerStats consumerStats =
+ subsStats.consumerStat.computeIfAbsent(c, k -> new
AggregatedConsumerStats());
+ consumerStats.blockedSubscriptionOnUnackedMsgs =
v.blockedSubscriptionOnUnackedMsgs;
+ consumerStats.msgRateRedeliver += v.msgRateRedeliver;
+ consumerStats.unackedMessages += v.unackedMessages;
+ });
+ });
}
public void reset() {
@@ -95,6 +113,8 @@ public class AggregatedNamespaceStats {
storageReadRate = 0;
replicationStats.clear();
+ subscriptionStats.clear();
+
storageWriteLatencyBuckets.reset();
entrySizeBuckets.reset();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
new file mode 100644
index 0000000..c46bbf5
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import org.apache.pulsar.broker.service.Consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AggregatedSubscriptionStats {
+
+ public long msgBacklog;
+
+ public boolean blockedSubscriptionOnUnackedMsgs;
+
+ public double msgRateRedeliver;
+
+ public long unackedMessages;
+
+ public double msgRateOut;
+
+ public double msgThroughputOut;
+
+ public Map<Consumer, AggregatedConsumerStats> consumerStat = new
HashMap<>();
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 22115ea..d07bc38 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -43,7 +43,7 @@ public class NamespaceStatsAggregator {
}
};
- public static void generate(PulsarService pulsar, boolean
includeTopicMetrics, SimpleTextOutputStream stream) {
+ public static void generate(PulsarService pulsar, boolean
includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream
stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats topicStats = localTopicStats.get();
@@ -53,7 +53,7 @@ public class NamespaceStatsAggregator {
bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
- getTopicStats(topic, topicStats);
+ getTopicStats(topic, topicStats, includeConsumerMetrics);
if (includeTopicMetrics) {
TopicStats.printTopicStats(stream, cluster, namespace,
name, topicStats);
@@ -71,12 +71,12 @@ public class NamespaceStatsAggregator {
});
}
- private static void getTopicStats(Topic topic, TopicStats stats) {
+ private static void getTopicStats(Topic topic, TopicStats stats, boolean
includeConsumerMetrics) {
stats.reset();
if (topic instanceof PersistentTopic) {
// Managed Ledger stats
- ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl)
((PersistentTopic)topic).getManagedLedger().getStats();
+ ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl)
((PersistentTopic) topic).getManagedLedger().getStats();
stats.storageSize = mlStats.getStoredMessagesSize();
@@ -108,7 +108,32 @@ public class NamespaceStatsAggregator {
stats.subscriptionsCount++;
stats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
+ AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+ .computeIfAbsent(name, k -> new
AggregatedSubscriptionStats());
+ subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog();
+
subscription.getConsumers().forEach(consumer -> {
+
+ // Consumer stats can be a lot if a subscription has many
consumers
+ if (includeConsumerMetrics) {
+ AggregatedConsumerStats consumerStats =
subsStats.consumerStat
+ .computeIfAbsent(consumer, k -> new
AggregatedConsumerStats());
+ consumerStats.unackedMessages =
consumer.getStats().unackedMessages;
+ consumerStats.msgRateRedeliver =
consumer.getStats().msgRateRedeliver;
+ consumerStats.msgRateOut = consumer.getStats().msgRateOut;
+ consumerStats.msgThroughputOut =
consumer.getStats().msgThroughputOut;
+ consumerStats.availablePermits =
consumer.getStats().availablePermits;
+ consumerStats.blockedSubscriptionOnUnackedMsgs =
consumer.getStats().blockedConsumerOnUnackedMsgs;
+ }
+
+ subsStats.unackedMessages +=
consumer.getStats().unackedMessages;
+ subsStats.msgRateRedeliver +=
consumer.getStats().msgRateRedeliver;
+ subsStats.msgRateOut += consumer.getStats().msgRateOut;
+ subsStats.msgThroughputOut +=
consumer.getStats().msgThroughputOut;
+ if (!subsStats.blockedSubscriptionOnUnackedMsgs &&
consumer.getStats().blockedConsumerOnUnackedMsgs) {
+ subsStats.blockedSubscriptionOnUnackedMsgs = true;
+ }
+
stats.consumersCount++;
stats.rateOut += consumer.getStats().msgRateOut;
stats.throughputOut += consumer.getStats().msgThroughputOut;
@@ -127,7 +152,7 @@ public class NamespaceStatsAggregator {
}
private static void printNamespaceStats(SimpleTextOutputStream stream,
String cluster, String namespace,
- AggregatedNamespaceStats stats) {
+ AggregatedNamespaceStats stats) {
metric(stream, cluster, namespace, "pulsar_topics_count",
stats.topicsCount);
metric(stream, cluster, namespace, "pulsar_subscriptions_count",
stats.subscriptionsCount);
metric(stream, cluster, namespace, "pulsar_producers_count",
stats.producersCount);
@@ -192,19 +217,19 @@ public class NamespaceStatsAggregator {
}
private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String name,
- long value) {
+ long value) {
stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace).write("\"} ");
stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
}
private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String name,
- double value) {
+ double value) {
stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace).write("\"} ");
stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
}
private static void metricWithRemoteCluster(SimpleTextOutputStream stream,
String cluster, String namespace,
- String name, String remoteCluster, double value) {
+ String name, String
remoteCluster, double value) {
stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace);
stream.write("\", remote_cluster=\"").write(remoteCluster).write("\"}
");
stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 167ec1c..8f0d30d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -63,14 +63,14 @@ public class PrometheusMetricsGenerator {
}).register(CollectorRegistry.defaultRegistry);
}
- public static void generate(PulsarService pulsar, boolean
includeTopicMetrics, OutputStream out) throws IOException {
+ public static void generate(PulsarService pulsar, boolean
includeTopicMetrics, boolean includeConsumerMetrics, OutputStream out) throws
IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
generateSystemMetrics(stream,
pulsar.getConfiguration().getClusterName());
- NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics,
stream);
+ NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics,
includeConsumerMetrics, stream);
FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
pulsar.getConfiguration().getClusterName(), stream);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index b924177..12058c1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -43,12 +43,14 @@ public class PrometheusMetricsServlet extends HttpServlet {
private final PulsarService pulsar;
private final boolean shouldExportTopicMetrics;
+ private final boolean shouldExportConsumerMetrics;
private ExecutorService executor = null;
- public PrometheusMetricsServlet(PulsarService pulsar, boolean
includeTopicMetrics) {
+ public PrometheusMetricsServlet(PulsarService pulsar, boolean
includeTopicMetrics, boolean includeConsumerMetrics) {
this.pulsar = pulsar;
this.shouldExportTopicMetrics = includeTopicMetrics;
+ this.shouldExportConsumerMetrics = includeConsumerMetrics;
}
@Override
@@ -65,7 +67,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
try {
res.setStatus(HttpStatus.OK_200);
res.setContentType("text/plain");
- PrometheusMetricsGenerator.generate(pulsar,
shouldExportTopicMetrics, res.getOutputStream());
+ PrometheusMetricsGenerator.generate(pulsar,
shouldExportTopicMetrics, shouldExportConsumerMetrics, res.getOutputStream());
context.complete();
} catch (IOException e) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index f5938b6..c592b8f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -44,6 +44,7 @@ class TopicStats {
double storageReadRate;
Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
+ Map<String, AggregatedSubscriptionStats> subscriptionStats = new
HashMap<>();
public void reset() {
subscriptionsCount = 0;
@@ -60,12 +61,13 @@ class TopicStats {
storageReadRate = 0;
replicationStats.clear();
+ subscriptionStats.clear();
storageWriteLatencyBuckets.reset();
entrySizeBuckets.reset();
}
static void printTopicStats(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- TopicStats stats) {
+ TopicStats stats) {
metric(stream, cluster, namespace, topic,
"pulsar_subscriptions_count", stats.subscriptionsCount);
metric(stream, cluster, namespace, topic, "pulsar_producers_count",
stats.producersCount);
@@ -108,12 +110,59 @@ class TopicStats {
metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
stats.entrySizeBuckets.getCount());
metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
stats.entrySizeBuckets.getSum());
+ stats.subscriptionStats.forEach((n, subsStats) -> {
+ metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_back_log", subsStats.msgBacklog);
+ metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver);
+ metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_unacked_massages", subsStats.unackedMessages);
+ metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_blocked_on_unacked_messages",
subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+ metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
+ metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
+ subsStats.consumerStat.forEach((c, consumerStats) -> {
+ metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(), "pulsar_consumer_msg_rate_redeliver",
consumerStats.msgRateRedeliver);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(), "pulsar_consumer_unacked_massages",
consumerStats.unackedMessages);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(), "pulsar_consumer_blocked_on_unacked_messages",
consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(), "pulsar_consumer_msg_throughput_out",
consumerStats.msgThroughputOut);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(), "pulsar_consumer_available_permits",
consumerStats.availablePermits);
+ });
+ });
+
}
private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String name, double value) {
+ String name, double value) {
stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace)
.write("\", topic=\"").write(topic).write("\"} ");
stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
}
+
+ private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic, String subscription,
+ String name, long value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace)
+ .write("\", topic=\"").write(topic).write("\",
subscription=\"").write(subscription).write("\"} ");
+ stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic, String subscription,
+ String name, double value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace)
+ .write("\", topic=\"").write(topic).write("\",
subscription=\"").write(subscription).write("\"} ");
+ stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic, String subscription,
+ String consumerName, long consumerId, String
name, long value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace)
+ .write("\", topic=\"").write(topic).write("\",
subscription=\"").write(subscription)
+ .write("\", consumer_name=\"").write(consumerName).write("\",
consumer_id=\"").write(consumerId).write("\"} ");
+ stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic, String subscription,
+ String consumerName, long consumerId, String
name, double value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace)
+ .write("\", topic=\"").write(topic).write("\",
subscription=\"").write(subscription)
+ .write("\", consumer_name=\"").write(consumerName).write("\",
consumer_id=\"").write(consumerId).write("\"} ");
+ stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 62d490e..6ad8d6d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -64,7 +64,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
- PrometheusMetricsGenerator.generate(pulsar, true, statsOut);
+ PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -110,7 +110,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
}
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
- PrometheusMetricsGenerator.generate(pulsar, false, statsOut);
+ PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());
Multimap<String, Metric> metrics = parseMetrics(metricsStr);