marksilcox commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r911822912


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java:
##########
@@ -305,155 +332,224 @@ private static void getTopicStats(Topic topic, 
TopicStats stats, boolean include
                 });
     }
 
-    private static void buildDefaultBrokerStats(Map<String, 
Collector.MetricFamilySamples> metrics, String cluster) {
-        // Print metrics with 0 values. This is necessary to have the 
available brokers being
-        // reported in the brokers dashboard even if they don't have any topic 
or traffic
-        metric(metrics, cluster, "pulsar_topics_count", 0);
-        metric(metrics, cluster, "pulsar_subscriptions_count", 0);
-        metric(metrics, cluster, "pulsar_producers_count", 0);
-        metric(metrics, cluster, "pulsar_consumers_count", 0);
-        metric(metrics, cluster, "pulsar_rate_in", 0);
-        metric(metrics, cluster, "pulsar_rate_out", 0);
-        metric(metrics, cluster, "pulsar_throughput_in", 0);
-        metric(metrics, cluster, "pulsar_throughput_out", 0);
-        metric(metrics, cluster, "pulsar_storage_size", 0);
-        metric(metrics, cluster, "pulsar_storage_logical_size", 0);
-        metric(metrics, cluster, "pulsar_storage_write_rate", 0);
-        metric(metrics, cluster, "pulsar_storage_read_rate", 0);
-        metric(metrics, cluster, "pulsar_msg_backlog", 0);
+    private static void printTopicsCountStats(SimpleTextOutputStream stream, 
String cluster,
+                                              Map<String, Long> 
namespaceTopicsCount) {
+        stream.write("# TYPE ").write("pulsar_topics_count").write(" gauge\n");
+        stream.write("pulsar_topics_count")
+                .write("{cluster=\"").write(cluster).write("\"} ")
+                .write(0).write(' ').write(System.currentTimeMillis())
+                .write('\n');
+        namespaceTopicsCount.forEach((ns, topicCount) -> 
stream.write("pulsar_topics_count")
+                .write("{cluster=\"").write(cluster)
+                .write("\",namespace=\"").write(ns)
+                .write("\"} ")
+                .write(topicCount).write(' ').write(System.currentTimeMillis())
+                .write('\n')
+        );
     }
 
-    private static void printTopicsCountStats(Map<String, 
Collector.MetricFamilySamples> metrics, String cluster,
-                                              String namespace,
-                                              LongAdder topicsCount) {
-        metric(metrics, cluster, namespace, "pulsar_topics_count", 
topicsCount.sum());
+    private static void printNamespaceStats(SimpleTextOutputStream stream, 
String cluster,
+                                            List<AggregatedNamespaceStats> 
stats) {
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_topics_count", 
stats, s -> s.topicsCount);
+        writeMetricWithBrokerDefault(stream, cluster, 
"pulsar_subscriptions_count", stats, s -> s.subscriptionsCount);
+        writeMetricWithBrokerDefault(stream, cluster, 
"pulsar_producers_count", stats, s -> s.producersCount);
+        writeMetricWithBrokerDefault(stream, cluster, 
"pulsar_consumers_count", stats, s -> s.consumersCount);
+
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_rate_in", stats, 
s -> s.rateIn);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_rate_out", 
stats, s -> s.rateOut);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_throughput_in", 
stats, s -> s.throughputIn);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_throughput_out", 
stats, s -> s.throughputOut);
+        writeMetric(stream, cluster, "pulsar_consumer_msg_ack_rate", stats, s 
-> s.messageAckRate);
+
+        writeMetric(stream, cluster, "pulsar_in_bytes_total", stats, s -> 
s.bytesInCounter);
+        writeMetric(stream, cluster, "pulsar_in_messages_total", stats, s -> 
s.msgInCounter);
+        writeMetric(stream, cluster, "pulsar_out_bytes_total", stats, s -> 
s.bytesOutCounter);
+        writeMetric(stream, cluster, "pulsar_out_messages_total", stats, s -> 
s.msgOutCounter);
+
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_size", 
stats,
+                s -> s.managedLedgerStats.storageSize);
+        writeMetricWithBrokerDefault(stream, cluster, 
"pulsar_storage_logical_size", stats,
+                s -> s.managedLedgerStats.storageLogicalSize);
+        writeMetric(stream, cluster, "pulsar_storage_backlog_size", stats, s 
-> s.managedLedgerStats.backlogSize);
+        writeMetric(stream, cluster, "pulsar_storage_offloaded_size",
+                stats, s -> s.managedLedgerStats.offloadedStorageUsed);
+
+        writeMetricWithBrokerDefault(stream, cluster, 
"pulsar_storage_write_rate", stats,
+                s -> s.managedLedgerStats.storageWriteRate);
+        writeMetricWithBrokerDefault(stream, cluster, 
"pulsar_storage_read_rate", stats,
+                s -> s.managedLedgerStats.storageReadRate);
+
+        writeMetric(stream, cluster, "pulsar_subscription_delayed", stats, s 
-> s.msgDelayed);
+
+        writeMsgBacklog(stream, cluster, stats, s -> s.msgBacklog);
+
+        stats.forEach(s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.refresh());
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_0_5", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[0]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_1", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[1]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_5", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[2]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_10", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[3]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_20", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[4]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_50", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[5]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_100", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[6]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_200", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[7]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_1000", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[8]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_overflow", 
stats,
+                s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[9]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_count",
+                stats, s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getCount());
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_sum",
+                stats, s -> 
s.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+
+        stats.forEach(s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh());
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_0_5", stats,
+                s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[0]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_1", stats,
+                s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[1]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_5", stats,
+                s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[2]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_10", stats,
+                s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[3]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_20", stats,
+                s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[4]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_50", stats,
+                s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[5]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_100", stats,
+                s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[6]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_200", stats,
+                s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[7]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_le_1000",
+                stats, s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[8]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_overflow",
+                stats, s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[9]);
+        writeMetric(stream, cluster, 
"pulsar_storage_ledger_write_latency_count",
+                stats, s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_sum",
+                stats, s -> 
s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+
+        stats.forEach(s -> s.managedLedgerStats.entrySizeBuckets.refresh());
+        writeMetric(stream, cluster, "pulsar_entry_size_le_128", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[0]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_512", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[1]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_1_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[2]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_2_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[3]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_4_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[4]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_16_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[5]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_100_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[6]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_1_mb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[7]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_overflow", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[8]);
+        writeMetric(stream, cluster, "pulsar_entry_size_count",
+                stats, s -> s.managedLedgerStats.entrySizeBuckets.getCount());
+        writeMetric(stream, cluster, "pulsar_entry_size_sum",
+                stats, s -> s.managedLedgerStats.entrySizeBuckets.getSum());
+
+        writeReplicationStat(stream, cluster, "pulsar_replication_rate_in", 
stats,
+                replStats -> replStats.msgRateIn);
+        writeReplicationStat(stream, cluster, "pulsar_replication_rate_out", 
stats,
+                replStats -> replStats.msgRateOut);
+        writeReplicationStat(stream, cluster, 
"pulsar_replication_throughput_in", stats,
+                replStats -> replStats.msgThroughputIn);
+        writeReplicationStat(stream, cluster, 
"pulsar_replication_throughput_out", stats,
+                replStats -> replStats.msgThroughputOut);
+        writeReplicationStat(stream, cluster, "pulsar_replication_backlog", 
stats,
+                replStats -> replStats.replicationBacklog);
+        writeReplicationStat(stream, cluster, 
"pulsar_replication_connected_count", stats,
+                replStats -> replStats.connectedCount);
+        writeReplicationStat(stream, cluster, 
"pulsar_replication_rate_expired", stats,
+                replStats -> replStats.msgRateExpired);
+        writeReplicationStat(stream, cluster, 
"pulsar_replication_delay_in_seconds", stats,
+                replStats -> replStats.replicationDelayInSeconds);
     }
 
-    private static void printNamespaceStats(Map<String, 
Collector.MetricFamilySamples> metrics, String cluster,
-                                            String namespace,
-                                            AggregatedNamespaceStats stats) {
-        metric(metrics, cluster, namespace, "pulsar_topics_count", 
stats.topicsCount);
-        metric(metrics, cluster, namespace, "pulsar_subscriptions_count", 
stats.subscriptionsCount);
-        metric(metrics, cluster, namespace, "pulsar_producers_count", 
stats.producersCount);
-        metric(metrics, cluster, namespace, "pulsar_consumers_count", 
stats.consumersCount);
-
-        metric(metrics, cluster, namespace, "pulsar_rate_in", stats.rateIn);
-        metric(metrics, cluster, namespace, "pulsar_rate_out", stats.rateOut);
-        metric(metrics, cluster, namespace, "pulsar_throughput_in", 
stats.throughputIn);
-        metric(metrics, cluster, namespace, "pulsar_throughput_out", 
stats.throughputOut);
-        metric(metrics, cluster, namespace, "pulsar_consumer_msg_ack_rate", 
stats.messageAckRate);
-
-        metric(metrics, cluster, namespace, "pulsar_in_bytes_total", 
stats.bytesInCounter);
-        metric(metrics, cluster, namespace, "pulsar_in_messages_total", 
stats.msgInCounter);
-        metric(metrics, cluster, namespace, "pulsar_out_bytes_total", 
stats.bytesOutCounter);
-        metric(metrics, cluster, namespace, "pulsar_out_messages_total", 
stats.msgOutCounter);
-
-        metric(metrics, cluster, namespace, "pulsar_storage_size", 
stats.managedLedgerStats.storageSize);
-        metric(metrics, cluster, namespace, "pulsar_storage_logical_size", 
stats.managedLedgerStats.storageLogicalSize);
-        metric(metrics, cluster, namespace, "pulsar_storage_backlog_size", 
stats.managedLedgerStats.backlogSize);
-        metric(metrics, cluster, namespace, "pulsar_storage_offloaded_size",
-                stats.managedLedgerStats.offloadedStorageUsed);
-
-        metric(metrics, cluster, namespace, "pulsar_storage_write_rate", 
stats.managedLedgerStats.storageWriteRate);
-        metric(metrics, cluster, namespace, "pulsar_storage_read_rate", 
stats.managedLedgerStats.storageReadRate);
-
-        metric(metrics, cluster, namespace, "pulsar_subscription_delayed", 
stats.msgDelayed);
-
-        metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_msg_backlog", "local", stats.msgBacklog);
-
-        stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
-        long[] latencyBuckets = 
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_1", latencyBuckets[1]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_5", latencyBuckets[2]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_10", latencyBuckets[3]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_20", latencyBuckets[4]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_50", latencyBuckets[5]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_100", latencyBuckets[6]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_200", latencyBuckets[7]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_overflow", latencyBuckets[9]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_write_latency_count",
-                
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
-
-        stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
-        long[] ledgerWritelatencyBuckets = 
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_le_1000",
-                ledgerWritelatencyBuckets[8]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_overflow",
-                ledgerWritelatencyBuckets[9]);
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_count",
-                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
-        metric(metrics, cluster, namespace, 
"pulsar_storage_ledger_write_latency_sum",
-                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
-
-        stats.managedLedgerStats.entrySizeBuckets.refresh();
-        long[] entrySizeBuckets = 
stats.managedLedgerStats.entrySizeBuckets.getBuckets();
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_128", 
entrySizeBuckets[0]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_512", 
entrySizeBuckets[1]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_1_kb", 
entrySizeBuckets[2]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_2_kb", 
entrySizeBuckets[3]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_4_kb", 
entrySizeBuckets[4]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_16_kb", 
entrySizeBuckets[5]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_100_kb", 
entrySizeBuckets[6]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_1_mb", 
entrySizeBuckets[7]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_overflow", 
entrySizeBuckets[8]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_count",
-                stats.managedLedgerStats.entrySizeBuckets.getCount());
-        metric(metrics, cluster, namespace, "pulsar_entry_size_sum",
-                stats.managedLedgerStats.entrySizeBuckets.getSum());
-
-        if (!stats.replicationStats.isEmpty()) {
-            stats.replicationStats.forEach((remoteCluster, replStats) -> {
-                metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_replication_rate_in", remoteCluster,
-                        replStats.msgRateIn);
-                metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_replication_rate_out", remoteCluster,
-                        replStats.msgRateOut);
-                metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_replication_throughput_in", remoteCluster,
-                        replStats.msgThroughputIn);
-                metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_replication_throughput_out", remoteCluster,
-                        replStats.msgThroughputOut);
-                metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_replication_backlog", remoteCluster,
-                        replStats.replicationBacklog);
-                metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_replication_connected_count",
-                        remoteCluster,
-                        replStats.connectedCount);
-                metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_replication_rate_expired", remoteCluster,
-                        replStats.msgRateExpired);
-                metricWithRemoteCluster(metrics, cluster, namespace, 
"pulsar_replication_delay_in_seconds",
-                        remoteCluster, replStats.replicationDelayInSeconds);
-            });
-        }
+    private static void writeMetricWithBrokerDefault(SimpleTextOutputStream 
stream, String cluster, String name,
+                                                     
List<AggregatedNamespaceStats> allNamespaceStats,
+                                                     
Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+        stream.write("# TYPE ").write(name).write(" gauge\n");
+        stream.write(name)

Review Comment:
   Current behaviour is to always write
   ``` java
   private static void printDefaultBrokerStats(SimpleTextOutputStream stream, 
String cluster) {
           // Print metrics with 0 values. This is necessary to have the 
available brokers being
           // reported in the brokers dashboard even if they don't have any 
topic or traffic
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to