jiazhai closed pull request #2299:  Add some metrics to prometheus.
URL: https://github.com/apache/incubator-pulsar/pull/2299
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index a639b85222..5f4a4e95f6 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -479,6 +479,9 @@ webSocketSessionIdleTimeoutMillis=300000
 # Enable topic level metrics
 exposeTopicLevelMetricsInPrometheus=true
 
+# Enable consumer level metrics. default is false
+# exposeConsumerLevelMetricsInPrometheus=false
+
 ### --- Functions --- ###
 
 # Enable Functions Worker Service in Broker
diff --git a/docker/grafana/dashboards/topic.json 
b/docker/grafana/dashboards/topic.json
new file mode 100644
index 0000000000..d53a0183b1
--- /dev/null
+++ b/docker/grafana/dashboards/topic.json
@@ -0,0 +1,1160 @@
+{
+  "__inputs": [
+    {
+      "name": "DS_default",
+      "label": "default",
+      "description": "",
+      "type": "datasource",
+      "pluginId": "prometheus",
+      "pluginName": "Prometheus"
+    }
+  ],
+  "__requires": [
+    {
+      "type": "grafana",
+      "id": "grafana",
+      "name": "Grafana",
+      "version": "5.1.0"
+    },
+    {
+      "type": "panel",
+      "id": "graph",
+      "name": "Graph",
+      "version": "5.0.0"
+    },
+    {
+      "type": "datasource",
+      "id": "prometheus",
+      "name": "Prometheus",
+      "version": "5.0.0"
+    }
+  ],
+  "annotations": {
+    "list": [
+      {
+        "builtIn": 1,
+        "datasource": "-- Grafana --",
+        "enable": true,
+        "hide": true,
+        "iconColor": "rgba(0, 211, 255, 1)",
+        "name": "Annotations & Alerts",
+        "type": "dashboard"
+      }
+    ]
+  },
+  "editable": true,
+  "gnetId": null,
+  "graphTooltip": 0,
+  "id": null,
+  "iteration": 1533285490117,
+  "links": [],
+  "panels": [
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 0
+      },
+      "id": 16,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "hideEmpty": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_rate_in{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "hide": false,
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "{{cluster}} - {{namespace}}",
+          "metric": "pulsar_rate_in",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local publish rate",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "msg/s",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 12,
+        "y": 0
+      },
+      "id": 2,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_subscription_msg_rate_out{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "{{subscription}}",
+          "metric": "pulsar_rate_out",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local delivery rate",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "msg / s",
+          "logBase": 1,
+          "max": null,
+          "min": "0",
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 7
+      },
+      "id": 5,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_throughput_in{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "{{cluster}} - {{namespace}}",
+          "metric": "pulsar_throughput_in",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local publish throughput (bytes/s)",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "Bps",
+          "label": "",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "description": "",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 12,
+        "y": 7
+      },
+      "id": 8,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": 
"pulsar_subscription_msg_throughput_out{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "{{subscription}}",
+          "metric": "pulsar_throughput_out",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local delivery throughput (bytes/s)",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "Bps",
+          "label": "",
+          "logBase": 1,
+          "max": null,
+          "min": "0",
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "decimals": 0,
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 14
+      },
+      "id": 7,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(pulsar_producers_count{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"})",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "producers",
+          "metric": "pulsar_producers_count",
+          "refId": "A",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_subscriptions_count{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"})",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "subscriptions",
+          "metric": "pulsar_subscriptions_count",
+          "refId": "B",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_consumers_count{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"})",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "consumers",
+          "metric": "pulsar_consumers_count",
+          "refId": "C",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Topics - Producers - Subscriptions - Consumers",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": [
+          "current"
+        ]
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "count",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 12,
+        "y": 14
+      },
+      "id": 4,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_subscription_back_log{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "{{subscription}}",
+          "metric": "pulsar_msg_backlog",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local backlog",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "Messages",
+          "logBase": 1,
+          "max": null,
+          "min": "0",
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {
+        "0 - 0.5 ms": "#2F575E",
+        "0.5 - 1 ms": "#3F6833",
+        "1 - 5 ms": "#629E51",
+        "10 - 20 ms": "#E5A8E2",
+        "100 - 200 ms": "#EF843C",
+        "20 - 50 ms": "#65C5DB",
+        "200 ms - 1 s": "#EA6460",
+        "5 - 10 ms": "#1F78C1",
+        "50 - 100 ms": "#E5AC0E",
+        "< +Inf ms": "#BF1B00",
+        "< 0.5 ms": "#508642",
+        "> 1 s": "#BF1B00"
+      },
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 5,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 21
+      },
+      "id": 3,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 0,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [
+        {
+          "alias": "< 100 ms",
+          "yaxis": 1
+        }
+      ],
+      "spaceLength": 10,
+      "stack": true,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_0_5{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "0 - 0.5 ms",
+          "metric": "pulsar_add_entry_latency_le_0_5",
+          "refId": "A",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_1{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "0.5 - 1 ms",
+          "metric": "pulsar_add_entry_latency_le_1",
+          "refId": "B",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_5{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "1 - 5 ms",
+          "metric": "pulsar_add_entry_latency_le_5",
+          "refId": "C",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_10{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "5 - 10 ms",
+          "metric": "pulsar_add_entry_latency_le_10",
+          "refId": "D",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_20{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "10 - 20 ms",
+          "metric": "pulsar_add_entry_latency_le_20",
+          "refId": "E",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_50{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "20 - 50 ms",
+          "metric": "pulsar_add_entry_latency_le_50",
+          "refId": "F",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_100{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "hide": false,
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "50 - 100 ms",
+          "metric": "pulsar_add_entry_latency_le_100",
+          "refId": "G",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_200{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "100 - 200 ms",
+          "metric": "pulsar_add_entry_latency_le_200",
+          "refId": "H",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_le_1000{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "200 ms - 1 s",
+          "metric": "pulsar_add_entry_latency_le_1000",
+          "refId": "I",
+          "step": 10
+        },
+        {
+          "expr": 
"sum(pulsar_storage_write_latency_overflow{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "> 1 s",
+          "metric": "pulsar_add_entry_latency_overflow",
+          "refId": "J",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Storage Write Latency",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "msg / s",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 12,
+        "y": 21
+      },
+      "id": 9,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(pulsar_storage_size{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"})",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "$namespace",
+          "metric": "pulsar_storage_size",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Storage Size",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "decbytes",
+          "label": "",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 6,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 28
+      },
+      "id": 12,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 0,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [
+        {
+          "alias": "< 2 KB",
+          "yaxis": 1
+        }
+      ],
+      "spaceLength": 10,
+      "stack": true,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(pulsar_entry_size_le_128{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 128 bytes",
+          "metric": "pulsar_entry_size_le_128",
+          "refId": "A",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_512{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 512 bytes",
+          "metric": "pulsar_entry_size_le_512",
+          "refId": "B",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_1_kb{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 1 KB",
+          "metric": "pulsar_entry_size_le_1_kb",
+          "refId": "C",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_2_kb{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 2 KB",
+          "metric": "pulsar_entry_size_le_2_kb",
+          "refId": "D",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_4_kb{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 4 KB",
+          "metric": "pulsar_entry_size_le_4_kb",
+          "refId": "E",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_16_kb{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 16 KB",
+          "metric": "pulsar_entry_size_le_16_kb",
+          "refId": "F",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_100_kb{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 100 KB",
+          "metric": "pulsar_entry_size_le_100_kb",
+          "refId": "G",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_1_mb{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 1 MB",
+          "metric": "pulsar_entry_size_le_1_mb",
+          "refId": "H",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_overflow{cluster=~\"$cluster\", 
namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "> 1 MB",
+          "metric": "pulsar_entry_size_le_overflow",
+          "refId": "I",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Storage entry size",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "msg / s",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    }
+  ],
+  "refresh": "5s",
+  "schemaVersion": 16,
+  "style": "dark",
+  "tags": [],
+  "templating": {
+    "list": [
+      {
+        "allValue": null,
+        "current": {},
+        "datasource": "${DS_default}",
+        "hide": 0,
+        "includeAll": false,
+        "label": "Cluster",
+        "multi": false,
+        "name": "cluster",
+        "options": [],
+        "query": "{cluster=~\".+\"}",
+        "refresh": 1,
+        "regex": "/.*[^_]cluster=\\\"([^\\\"]+)\\\".*/",
+        "sort": 1,
+        "tagValuesQuery": "",
+        "tags": [],
+        "tagsQuery": "",
+        "type": "query",
+        "useTags": false
+      },
+      {
+        "allValue": null,
+        "current": {},
+        "datasource": "${DS_default}",
+        "hide": 0,
+        "includeAll": true,
+        "label": "Namespace",
+        "multi": false,
+        "name": "namespace",
+        "options": [],
+        "query": "{namespace=~\".+\"}",
+        "refresh": 2,
+        "regex": "/.*namespace=\\\"([^\\\"]+)\\\".*/",
+        "sort": 1,
+        "tagValuesQuery": "",
+        "tags": [],
+        "tagsQuery": "",
+        "type": "query",
+        "useTags": false
+      },
+      {
+        "allValue": null,
+        "current": {},
+        "datasource": "${DS_default}",
+        "hide": 0,
+        "includeAll": true,
+        "label": "Topic",
+        "multi": false,
+        "name": "topic",
+        "options": [],
+        "query": "{topic=~\".+\"}",
+        "refresh": 2,
+        "regex": "/.*topic=\\\"([^\\\"]+)\\\".*/",
+        "sort": 1,
+        "tagValuesQuery": "",
+        "tags": [],
+        "tagsQuery": "",
+        "type": "query",
+        "useTags": false
+      }
+    ]
+  },
+  "time": {
+    "from": "now-5m",
+    "to": "now"
+  },
+  "timepicker": {
+    "refresh_intervals": [
+      "5s",
+      "10s",
+      "30s",
+      "1m",
+      "5m",
+      "15m",
+      "30m",
+      "1h",
+      "2h",
+      "1d"
+    ],
+    "time_options": [
+      "5m",
+      "15m",
+      "1h",
+      "6h",
+      "12h",
+      "24h",
+      "2d",
+      "7d",
+      "30d"
+    ]
+  },
+  "timezone": "browser",
+  "title": "Pulsar - Topic",
+  "uid": "3xEtii5mk",
+  "version": 10
+}
\ No newline at end of file
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 182bc0939f..7656eccda8 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -470,6 +470,7 @@
     /**** --- Metrics --- ****/
     // If true, export topic level metrics otherwise namespace level
     private boolean exposeTopicLevelMetricsInPrometheus = true;
+    private boolean exposeConsumerLevelMetricsInPrometheus = false;
 
     /**** --- Functions --- ****/
     private boolean functionsWorkerEnabled = false;
@@ -1615,10 +1616,18 @@ public boolean exposeTopicLevelMetricsInPrometheus() {
         return exposeTopicLevelMetricsInPrometheus;
     }
 
+    public boolean exposeConsumerLevelMetricsInPrometheus() {
+        return exposeConsumerLevelMetricsInPrometheus;
+    }
+
     public void setExposeTopicLevelMetricsInPrometheus(boolean 
exposeTopicLevelMetricsInPrometheus) {
         this.exposeTopicLevelMetricsInPrometheus = 
exposeTopicLevelMetricsInPrometheus;
     }
 
+    public void setExposeConsumerLevelMetricsInPrometheus(boolean 
exposeConsumerLevelMetricsInPrometheus) {
+        this.exposeConsumerLevelMetricsInPrometheus = 
exposeConsumerLevelMetricsInPrometheus;
+    }
+
     public String getSchemaRegistryStorageClassName() {
        return schemaRegistryStorageClassName;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 8a811ac667..93a045e537 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -357,7 +357,7 @@ public void start() throws PulsarServerException {
             this.webService.addRestResources("/lookup", 
"org.apache.pulsar.broker.lookup", true, attributeMap);
 
             this.webService.addServlet("/metrics",
-                    new ServletHolder(new PrometheusMetricsServlet(this, 
config.exposeTopicLevelMetricsInPrometheus())),
+                    new ServletHolder(new PrometheusMetricsServlet(this, 
config.exposeTopicLevelMetricsInPrometheus(), 
config.exposeConsumerLevelMetricsInPrometheus())),
                     false, attributeMap);
 
             if (config.isWebSocketServiceEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
new file mode 100644
index 0000000000..0fadf3e6ac
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+public class AggregatedConsumerStats {
+
+    public boolean blockedSubscriptionOnUnackedMsgs;
+
+    public double msgRateRedeliver;
+
+    public long unackedMessages;
+
+    public double msgRateOut;
+
+    public double msgThroughputOut;
+
+    public long availablePermits;
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 299af88579..995effc244 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -46,6 +46,8 @@
 
     public Map<String, AggregatedReplicationStats> replicationStats = new 
HashMap<>();
 
+    public Map<String, AggregatedSubscriptionStats> subscriptionStats = new 
HashMap<>();
+
     void updateStats(TopicStats stats) {
         topicsCount++;
 
@@ -77,6 +79,22 @@ void updateStats(TopicStats stats) {
             replStats.msgThroughputOut += as.msgThroughputOut;
             replStats.replicationBacklog += as.replicationBacklog;
         });
+
+        stats.subscriptionStats.forEach((n, as) -> {
+            AggregatedSubscriptionStats subsStats =
+                    subscriptionStats.computeIfAbsent(n, k -> new 
AggregatedSubscriptionStats());
+            subsStats.blockedSubscriptionOnUnackedMsgs = 
as.blockedSubscriptionOnUnackedMsgs;
+            subsStats.msgBacklog += as.msgBacklog;
+            subsStats.msgRateRedeliver += as.msgRateRedeliver;
+            subsStats.unackedMessages += as.unackedMessages;
+            as.consumerStat.forEach((c, v) -> {
+                AggregatedConsumerStats consumerStats =
+                        subsStats.consumerStat.computeIfAbsent(c, k -> new 
AggregatedConsumerStats());
+                consumerStats.blockedSubscriptionOnUnackedMsgs = 
v.blockedSubscriptionOnUnackedMsgs;
+                consumerStats.msgRateRedeliver += v.msgRateRedeliver;
+                consumerStats.unackedMessages += v.unackedMessages;
+            });
+        });
     }
 
     public void reset() {
@@ -95,6 +113,8 @@ public void reset() {
         storageReadRate = 0;
 
         replicationStats.clear();
+        subscriptionStats.clear();
+
         storageWriteLatencyBuckets.reset();
         entrySizeBuckets.reset();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
new file mode 100644
index 0000000000..c46bbf5a2b
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import org.apache.pulsar.broker.service.Consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AggregatedSubscriptionStats {
+
+    public long msgBacklog;
+
+    public boolean blockedSubscriptionOnUnackedMsgs;
+
+    public double msgRateRedeliver;
+
+    public long unackedMessages;
+
+    public double msgRateOut;
+
+    public double msgThroughputOut;
+
+    public Map<Consumer, AggregatedConsumerStats> consumerStat = new 
HashMap<>();
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 22115ea607..d07bc386ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -43,7 +43,7 @@ protected TopicStats initialValue() throws Exception {
         }
     };
 
-    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, SimpleTextOutputStream stream) {
+    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream 
stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
         TopicStats topicStats = localTopicStats.get();
@@ -53,7 +53,7 @@ public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, S
 
             bundlesMap.forEach((bundle, topicsMap) -> {
                 topicsMap.forEach((name, topic) -> {
-                    getTopicStats(topic, topicStats);
+                    getTopicStats(topic, topicStats, includeConsumerMetrics);
 
                     if (includeTopicMetrics) {
                         TopicStats.printTopicStats(stream, cluster, namespace, 
name, topicStats);
@@ -71,12 +71,12 @@ public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, S
         });
     }
 
-    private static void getTopicStats(Topic topic, TopicStats stats) {
+    private static void getTopicStats(Topic topic, TopicStats stats, boolean 
includeConsumerMetrics) {
         stats.reset();
 
         if (topic instanceof PersistentTopic) {
             // Managed Ledger stats
-            ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) 
((PersistentTopic)topic).getManagedLedger().getStats();
+            ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) 
((PersistentTopic) topic).getManagedLedger().getStats();
 
             stats.storageSize = mlStats.getStoredMessagesSize();
 
@@ -108,7 +108,32 @@ private static void getTopicStats(Topic topic, TopicStats 
stats) {
             stats.subscriptionsCount++;
             stats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
 
+            AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+                    .computeIfAbsent(name, k -> new 
AggregatedSubscriptionStats());
+            subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog();
+
             subscription.getConsumers().forEach(consumer -> {
+
+                // Consumer stats can be a lot if a subscription has many 
consumers
+                if (includeConsumerMetrics) {
+                    AggregatedConsumerStats consumerStats = 
subsStats.consumerStat
+                            .computeIfAbsent(consumer, k -> new 
AggregatedConsumerStats());
+                    consumerStats.unackedMessages = 
consumer.getStats().unackedMessages;
+                    consumerStats.msgRateRedeliver = 
consumer.getStats().msgRateRedeliver;
+                    consumerStats.msgRateOut = consumer.getStats().msgRateOut;
+                    consumerStats.msgThroughputOut = 
consumer.getStats().msgThroughputOut;
+                    consumerStats.availablePermits = 
consumer.getStats().availablePermits;
+                    consumerStats.blockedSubscriptionOnUnackedMsgs = 
consumer.getStats().blockedConsumerOnUnackedMsgs;
+                }
+
+                subsStats.unackedMessages += 
consumer.getStats().unackedMessages;
+                subsStats.msgRateRedeliver += 
consumer.getStats().msgRateRedeliver;
+                subsStats.msgRateOut += consumer.getStats().msgRateOut;
+                subsStats.msgThroughputOut += 
consumer.getStats().msgThroughputOut;
+                if (!subsStats.blockedSubscriptionOnUnackedMsgs && 
consumer.getStats().blockedConsumerOnUnackedMsgs) {
+                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
+                }
+
                 stats.consumersCount++;
                 stats.rateOut += consumer.getStats().msgRateOut;
                 stats.throughputOut += consumer.getStats().msgThroughputOut;
@@ -127,7 +152,7 @@ private static void getTopicStats(Topic topic, TopicStats 
stats) {
     }
 
     private static void printNamespaceStats(SimpleTextOutputStream stream, 
String cluster, String namespace,
-            AggregatedNamespaceStats stats) {
+                                            AggregatedNamespaceStats stats) {
         metric(stream, cluster, namespace, "pulsar_topics_count", 
stats.topicsCount);
         metric(stream, cluster, namespace, "pulsar_subscriptions_count", 
stats.subscriptionsCount);
         metric(stream, cluster, namespace, "pulsar_producers_count", 
stats.producersCount);
@@ -192,19 +217,19 @@ private static void 
printNamespaceStats(SimpleTextOutputStream stream, String cl
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String name,
-            long value) {
+                               long value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String name,
-            double value) {
+                               double value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metricWithRemoteCluster(SimpleTextOutputStream stream, 
String cluster, String namespace,
-            String name, String remoteCluster, double value) {
+                                                String name, String 
remoteCluster, double value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace);
         stream.write("\", remote_cluster=\"").write(remoteCluster).write("\"} 
");
         stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 167ec1c619..8f0d30d56a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -63,14 +63,14 @@ public double get() {
         }).register(CollectorRegistry.defaultRegistry);
     }
 
-    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, OutputStream out) throws IOException {
+    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics, OutputStream out) throws 
IOException {
         ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
         try {
             SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
 
             generateSystemMetrics(stream, 
pulsar.getConfiguration().getClusterName());
 
-            NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, 
stream);
+            NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, 
includeConsumerMetrics, stream);
 
             FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
                     pulsar.getConfiguration().getClusterName(), stream);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index b924177c3a..12058c1dec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -43,12 +43,14 @@
 
     private final PulsarService pulsar;
     private final boolean shouldExportTopicMetrics;
+    private final boolean shouldExportConsumerMetrics;
 
     private ExecutorService executor = null;
 
-    public PrometheusMetricsServlet(PulsarService pulsar, boolean 
includeTopicMetrics) {
+    public PrometheusMetricsServlet(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics) {
         this.pulsar = pulsar;
         this.shouldExportTopicMetrics = includeTopicMetrics;
+        this.shouldExportConsumerMetrics = includeConsumerMetrics;
     }
 
     @Override
@@ -65,7 +67,7 @@ protected void doGet(HttpServletRequest request, 
HttpServletResponse response)
             try {
                 res.setStatus(HttpStatus.OK_200);
                 res.setContentType("text/plain");
-                PrometheusMetricsGenerator.generate(pulsar, 
shouldExportTopicMetrics, res.getOutputStream());
+                PrometheusMetricsGenerator.generate(pulsar, 
shouldExportTopicMetrics, shouldExportConsumerMetrics, res.getOutputStream());
                 context.complete();
 
             } catch (IOException e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index f5938b613a..c592b8f49b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -44,6 +44,7 @@
     double storageReadRate;
 
     Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
+    Map<String, AggregatedSubscriptionStats> subscriptionStats = new 
HashMap<>();
 
     public void reset() {
         subscriptionsCount = 0;
@@ -60,12 +61,13 @@ public void reset() {
         storageReadRate = 0;
 
         replicationStats.clear();
+        subscriptionStats.clear();
         storageWriteLatencyBuckets.reset();
         entrySizeBuckets.reset();
     }
 
     static void printTopicStats(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            TopicStats stats) {
+                                TopicStats stats) {
 
         metric(stream, cluster, namespace, topic, 
"pulsar_subscriptions_count", stats.subscriptionsCount);
         metric(stream, cluster, namespace, topic, "pulsar_producers_count", 
stats.producersCount);
@@ -108,12 +110,59 @@ static void printTopicStats(SimpleTextOutputStream 
stream, String cluster, Strin
         metric(stream, cluster, namespace, topic, "pulsar_entry_size_count", 
stats.entrySizeBuckets.getCount());
         metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", 
stats.entrySizeBuckets.getSum());
 
+        stats.subscriptionStats.forEach((n, subsStats) -> {
+            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_back_log", subsStats.msgBacklog);
+            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver);
+            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_unacked_massages", subsStats.unackedMessages);
+            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_blocked_on_unacked_messages", 
subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
+            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
+            subsStats.consumerStat.forEach((c, consumerStats) -> {
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(), "pulsar_consumer_msg_rate_redeliver", 
consumerStats.msgRateRedeliver);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(), "pulsar_consumer_unacked_massages", 
consumerStats.unackedMessages);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(), "pulsar_consumer_blocked_on_unacked_messages", 
consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(), "pulsar_consumer_msg_throughput_out", 
consumerStats.msgThroughputOut);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(), "pulsar_consumer_available_permits", 
consumerStats.availablePermits);
+            });
+        });
+
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String name, double value) {
+                               String name, double value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace)
                 .write("\", topic=\"").write(topic).write("\"} ");
         stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
     }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic, String subscription,
+                               String name, long value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace)
+                .write("\", topic=\"").write(topic).write("\", 
subscription=\"").write(subscription).write("\"} ");
+        stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic, String subscription,
+                               String name, double value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace)
+                .write("\", topic=\"").write(topic).write("\", 
subscription=\"").write(subscription).write("\"} ");
+        stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic, String subscription,
+                               String consumerName, long consumerId, String 
name, long value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace)
+                .write("\", topic=\"").write(topic).write("\", 
subscription=\"").write(subscription)
+                .write("\", consumer_name=\"").write(consumerName).write("\", 
consumer_id=\"").write(consumerId).write("\"} ");
+        stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic, String subscription,
+                               String consumerName, long consumerId, String 
name, double value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", 
namespace=\"").write(namespace)
+                .write("\", topic=\"").write(topic).write("\", 
subscription=\"").write(subscription)
+                .write("\", consumer_name=\"").write(consumerName).write("\", 
consumer_id=\"").write(consumerId).write("\"} ");
+        stream.write(value).write(' 
').write(System.currentTimeMillis()).write('\n');
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 62d490e130..6ad8d6d5ce 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -64,7 +64,7 @@ public void testPerTopicStats() throws Exception {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, statsOut);
+        PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
         String metricsStr = new String(statsOut.toByteArray());
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -110,7 +110,7 @@ public void testPerNamespaceStats() throws Exception {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, statsOut);
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
         String metricsStr = new String(statsOut.toByteArray());
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to