marksilcox commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r911822912
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java:
##########
@@ -305,155 +332,224 @@ private static void getTopicStats(Topic topic,
TopicStats stats, boolean include
});
}
- private static void buildDefaultBrokerStats(Map<String,
Collector.MetricFamilySamples> metrics, String cluster) {
- // Print metrics with 0 values. This is necessary to have the
available brokers being
- // reported in the brokers dashboard even if they don't have any topic
or traffic
- metric(metrics, cluster, "pulsar_topics_count", 0);
- metric(metrics, cluster, "pulsar_subscriptions_count", 0);
- metric(metrics, cluster, "pulsar_producers_count", 0);
- metric(metrics, cluster, "pulsar_consumers_count", 0);
- metric(metrics, cluster, "pulsar_rate_in", 0);
- metric(metrics, cluster, "pulsar_rate_out", 0);
- metric(metrics, cluster, "pulsar_throughput_in", 0);
- metric(metrics, cluster, "pulsar_throughput_out", 0);
- metric(metrics, cluster, "pulsar_storage_size", 0);
- metric(metrics, cluster, "pulsar_storage_logical_size", 0);
- metric(metrics, cluster, "pulsar_storage_write_rate", 0);
- metric(metrics, cluster, "pulsar_storage_read_rate", 0);
- metric(metrics, cluster, "pulsar_msg_backlog", 0);
+ private static void printTopicsCountStats(SimpleTextOutputStream stream,
String cluster,
+ Map<String, Long>
namespaceTopicsCount) {
+ stream.write("# TYPE ").write("pulsar_topics_count").write(" gauge\n");
+ stream.write("pulsar_topics_count")
+ .write("{cluster=\"").write(cluster).write("\"} ")
+ .write(0).write(' ').write(System.currentTimeMillis())
+ .write('\n');
+ namespaceTopicsCount.forEach((ns, topicCount) ->
stream.write("pulsar_topics_count")
+ .write("{cluster=\"").write(cluster)
+ .write("\",namespace=\"").write(ns)
+ .write("\"} ")
+ .write(topicCount).write(' ').write(System.currentTimeMillis())
+ .write('\n')
+ );
}
- private static void printTopicsCountStats(Map<String,
Collector.MetricFamilySamples> metrics, String cluster,
- String namespace,
- LongAdder topicsCount) {
- metric(metrics, cluster, namespace, "pulsar_topics_count",
topicsCount.sum());
+ private static void printNamespaceStats(SimpleTextOutputStream stream,
String cluster,
+ List<AggregatedNamespaceStats>
stats) {
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_topics_count",
stats, s -> s.topicsCount);
+ writeMetricWithBrokerDefault(stream, cluster,
"pulsar_subscriptions_count", stats, s -> s.subscriptionsCount);
+ writeMetricWithBrokerDefault(stream, cluster,
"pulsar_producers_count", stats, s -> s.producersCount);
+ writeMetricWithBrokerDefault(stream, cluster,
"pulsar_consumers_count", stats, s -> s.consumersCount);
+
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_rate_in", stats,
s -> s.rateIn);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_rate_out",
stats, s -> s.rateOut);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_throughput_in",
stats, s -> s.throughputIn);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_throughput_out",
stats, s -> s.throughputOut);
+ writeMetric(stream, cluster, "pulsar_consumer_msg_ack_rate", stats, s
-> s.messageAckRate);
+
+ writeMetric(stream, cluster, "pulsar_in_bytes_total", stats, s ->
s.bytesInCounter);
+ writeMetric(stream, cluster, "pulsar_in_messages_total", stats, s ->
s.msgInCounter);
+ writeMetric(stream, cluster, "pulsar_out_bytes_total", stats, s ->
s.bytesOutCounter);
+ writeMetric(stream, cluster, "pulsar_out_messages_total", stats, s ->
s.msgOutCounter);
+
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_size",
stats,
+ s -> s.managedLedgerStats.storageSize);
+ writeMetricWithBrokerDefault(stream, cluster,
"pulsar_storage_logical_size", stats,
+ s -> s.managedLedgerStats.storageLogicalSize);
+ writeMetric(stream, cluster, "pulsar_storage_backlog_size", stats, s
-> s.managedLedgerStats.backlogSize);
+ writeMetric(stream, cluster, "pulsar_storage_offloaded_size",
+ stats, s -> s.managedLedgerStats.offloadedStorageUsed);
+
+ writeMetricWithBrokerDefault(stream, cluster,
"pulsar_storage_write_rate", stats,
+ s -> s.managedLedgerStats.storageWriteRate);
+ writeMetricWithBrokerDefault(stream, cluster,
"pulsar_storage_read_rate", stats,
+ s -> s.managedLedgerStats.storageReadRate);
+
+ writeMetric(stream, cluster, "pulsar_subscription_delayed", stats, s
-> s.msgDelayed);
+
+ writeMsgBacklog(stream, cluster, stats, s -> s.msgBacklog);
+
+ stats.forEach(s ->
s.managedLedgerStats.storageWriteLatencyBuckets.refresh());
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_0_5",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[0]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_1",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[1]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_5",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[2]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_10",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[3]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_20",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[4]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_50",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[5]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_100",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[6]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_200",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[7]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_1000",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[8]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_overflow",
stats,
+ s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[9]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_count",
+ stats, s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getCount());
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_sum",
+ stats, s ->
s.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+
+ stats.forEach(s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh());
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_0_5", stats,
+ s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[0]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_1", stats,
+ s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[1]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_5", stats,
+ s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[2]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_10", stats,
+ s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[3]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_20", stats,
+ s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[4]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_50", stats,
+ s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[5]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_100", stats,
+ s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[6]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_200", stats,
+ s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[7]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_le_1000",
+ stats, s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[8]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_overflow",
+ stats, s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[9]);
+ writeMetric(stream, cluster,
"pulsar_storage_ledger_write_latency_count",
+ stats, s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_sum",
+ stats, s ->
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+
+ stats.forEach(s -> s.managedLedgerStats.entrySizeBuckets.refresh());
+ writeMetric(stream, cluster, "pulsar_entry_size_le_128", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[0]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_512", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[1]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_1_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[2]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_2_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[3]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_4_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[4]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_16_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[5]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_100_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[6]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_1_mb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[7]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_overflow", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[8]);
+ writeMetric(stream, cluster, "pulsar_entry_size_count",
+ stats, s -> s.managedLedgerStats.entrySizeBuckets.getCount());
+ writeMetric(stream, cluster, "pulsar_entry_size_sum",
+ stats, s -> s.managedLedgerStats.entrySizeBuckets.getSum());
+
+ writeReplicationStat(stream, cluster, "pulsar_replication_rate_in",
stats,
+ replStats -> replStats.msgRateIn);
+ writeReplicationStat(stream, cluster, "pulsar_replication_rate_out",
stats,
+ replStats -> replStats.msgRateOut);
+ writeReplicationStat(stream, cluster,
"pulsar_replication_throughput_in", stats,
+ replStats -> replStats.msgThroughputIn);
+ writeReplicationStat(stream, cluster,
"pulsar_replication_throughput_out", stats,
+ replStats -> replStats.msgThroughputOut);
+ writeReplicationStat(stream, cluster, "pulsar_replication_backlog",
stats,
+ replStats -> replStats.replicationBacklog);
+ writeReplicationStat(stream, cluster,
"pulsar_replication_connected_count", stats,
+ replStats -> replStats.connectedCount);
+ writeReplicationStat(stream, cluster,
"pulsar_replication_rate_expired", stats,
+ replStats -> replStats.msgRateExpired);
+ writeReplicationStat(stream, cluster,
"pulsar_replication_delay_in_seconds", stats,
+ replStats -> replStats.replicationDelayInSeconds);
}
- private static void printNamespaceStats(Map<String,
Collector.MetricFamilySamples> metrics, String cluster,
- String namespace,
- AggregatedNamespaceStats stats) {
- metric(metrics, cluster, namespace, "pulsar_topics_count",
stats.topicsCount);
- metric(metrics, cluster, namespace, "pulsar_subscriptions_count",
stats.subscriptionsCount);
- metric(metrics, cluster, namespace, "pulsar_producers_count",
stats.producersCount);
- metric(metrics, cluster, namespace, "pulsar_consumers_count",
stats.consumersCount);
-
- metric(metrics, cluster, namespace, "pulsar_rate_in", stats.rateIn);
- metric(metrics, cluster, namespace, "pulsar_rate_out", stats.rateOut);
- metric(metrics, cluster, namespace, "pulsar_throughput_in",
stats.throughputIn);
- metric(metrics, cluster, namespace, "pulsar_throughput_out",
stats.throughputOut);
- metric(metrics, cluster, namespace, "pulsar_consumer_msg_ack_rate",
stats.messageAckRate);
-
- metric(metrics, cluster, namespace, "pulsar_in_bytes_total",
stats.bytesInCounter);
- metric(metrics, cluster, namespace, "pulsar_in_messages_total",
stats.msgInCounter);
- metric(metrics, cluster, namespace, "pulsar_out_bytes_total",
stats.bytesOutCounter);
- metric(metrics, cluster, namespace, "pulsar_out_messages_total",
stats.msgOutCounter);
-
- metric(metrics, cluster, namespace, "pulsar_storage_size",
stats.managedLedgerStats.storageSize);
- metric(metrics, cluster, namespace, "pulsar_storage_logical_size",
stats.managedLedgerStats.storageLogicalSize);
- metric(metrics, cluster, namespace, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize);
- metric(metrics, cluster, namespace, "pulsar_storage_offloaded_size",
- stats.managedLedgerStats.offloadedStorageUsed);
-
- metric(metrics, cluster, namespace, "pulsar_storage_write_rate",
stats.managedLedgerStats.storageWriteRate);
- metric(metrics, cluster, namespace, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate);
-
- metric(metrics, cluster, namespace, "pulsar_subscription_delayed",
stats.msgDelayed);
-
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_msg_backlog", "local", stats.msgBacklog);
-
- stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
- long[] latencyBuckets =
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_1", latencyBuckets[1]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_5", latencyBuckets[2]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_10", latencyBuckets[3]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_20", latencyBuckets[4]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_50", latencyBuckets[5]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_100", latencyBuckets[6]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_200", latencyBuckets[7]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_overflow", latencyBuckets[9]);
- metric(metrics, cluster, namespace,
"pulsar_storage_write_latency_count",
-
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
-
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
- long[] ledgerWritelatencyBuckets =
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_1000",
- ledgerWritelatencyBuckets[8]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_overflow",
- ledgerWritelatencyBuckets[9]);
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_count",
-
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
- metric(metrics, cluster, namespace,
"pulsar_storage_ledger_write_latency_sum",
-
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
-
- stats.managedLedgerStats.entrySizeBuckets.refresh();
- long[] entrySizeBuckets =
stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_128",
entrySizeBuckets[0]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_512",
entrySizeBuckets[1]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_1_kb",
entrySizeBuckets[2]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_2_kb",
entrySizeBuckets[3]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_4_kb",
entrySizeBuckets[4]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_16_kb",
entrySizeBuckets[5]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_100_kb",
entrySizeBuckets[6]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_1_mb",
entrySizeBuckets[7]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_overflow",
entrySizeBuckets[8]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_count",
- stats.managedLedgerStats.entrySizeBuckets.getCount());
- metric(metrics, cluster, namespace, "pulsar_entry_size_sum",
- stats.managedLedgerStats.entrySizeBuckets.getSum());
-
- if (!stats.replicationStats.isEmpty()) {
- stats.replicationStats.forEach((remoteCluster, replStats) -> {
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_replication_rate_in", remoteCluster,
- replStats.msgRateIn);
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_replication_rate_out", remoteCluster,
- replStats.msgRateOut);
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_replication_throughput_in", remoteCluster,
- replStats.msgThroughputIn);
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_replication_throughput_out", remoteCluster,
- replStats.msgThroughputOut);
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_replication_backlog", remoteCluster,
- replStats.replicationBacklog);
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_replication_connected_count",
- remoteCluster,
- replStats.connectedCount);
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_replication_rate_expired", remoteCluster,
- replStats.msgRateExpired);
- metricWithRemoteCluster(metrics, cluster, namespace,
"pulsar_replication_delay_in_seconds",
- remoteCluster, replStats.replicationDelayInSeconds);
- });
- }
+ private static void writeMetricWithBrokerDefault(SimpleTextOutputStream
stream, String cluster, String name,
+
List<AggregatedNamespaceStats> allNamespaceStats,
+
Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+ stream.write("# TYPE ").write(name).write(" gauge\n");
+ stream.write(name)
Review Comment:
Current behaviour is to always write
``` java
private static void printDefaultBrokerStats(SimpleTextOutputStream stream,
String cluster) {
// Print metrics with 0 values. This is necessary to have the
available brokers being
// reported in the brokers dashboard even if they don't have any
topic or traffic
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]