asafm commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r913071081


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java:
##########
@@ -296,161 +306,264 @@ private static void getTopicStats(Topic topic, 
TopicStats stats, boolean include
                 });
     }
 
-    private static void printDefaultBrokerStats(SimpleTextOutputStream stream, 
String cluster) {
-        // Print metrics with 0 values. This is necessary to have the 
available brokers being
-        // reported in the brokers dashboard even if they don't have any topic 
or traffic
-        metric(stream, cluster, "pulsar_topics_count", 0);
-        metric(stream, cluster, "pulsar_subscriptions_count", 0);
-        metric(stream, cluster, "pulsar_producers_count", 0);
-        metric(stream, cluster, "pulsar_consumers_count", 0);
-        metric(stream, cluster, "pulsar_rate_in", 0);
-        metric(stream, cluster, "pulsar_rate_out", 0);
-        metric(stream, cluster, "pulsar_throughput_in", 0);
-        metric(stream, cluster, "pulsar_throughput_out", 0);
-        metric(stream, cluster, "pulsar_storage_size", 0);
-        metric(stream, cluster, "pulsar_storage_logical_size", 0);
-        metric(stream, cluster, "pulsar_storage_write_rate", 0);
-        metric(stream, cluster, "pulsar_storage_read_rate", 0);
-        metric(stream, cluster, "pulsar_msg_backlog", 0);
+    private static void printTopicsCountStats(Map<String, ByteBuf> allMetrics, 
Map<String, Long> namespaceTopicsCount,
+                                              String cluster) {
+        namespaceTopicsCount.forEach((ns, topicCount) ->
+                writeMetricWithBrokerDefault(allMetrics, 
"pulsar_topics_count", topicCount, cluster, ns)
+        );
     }
 
-    private static void printTopicsCountStats(SimpleTextOutputStream stream, 
String cluster, String namespace,
-                                              LongAdder topicsCount) {
-        metric(stream, cluster, namespace, "pulsar_topics_count", 
topicsCount.sum());
-    }
+    private static void printNamespaceStats(Map<String, ByteBuf> allMetrics, 
AggregatedNamespaceStats stats,
+                                            String cluster, String namespace) {
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_topics_count", 
stats.topicsCount, cluster, namespace);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_subscriptions_count", 
stats.subscriptionsCount, cluster,
+                namespace);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_producers_count", 
stats.producersCount, cluster, namespace);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_consumers_count", 
stats.consumersCount, cluster, namespace);
+
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_rate_in", 
stats.rateIn, cluster, namespace);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_rate_out", 
stats.rateOut, cluster, namespace);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_throughput_in", 
stats.throughputIn, cluster, namespace);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_throughput_out", 
stats.throughputOut, cluster, namespace);
+        writeMetric(allMetrics, "pulsar_consumer_msg_ack_rate", 
stats.messageAckRate, cluster, namespace);
+
+        writeMetric(allMetrics, "pulsar_in_bytes_total", stats.bytesInCounter, 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_in_messages_total", 
stats.msgInCounter, cluster, namespace);
+        writeMetric(allMetrics, "pulsar_out_bytes_total", 
stats.bytesOutCounter, cluster, namespace);
+        writeMetric(allMetrics, "pulsar_out_messages_total", 
stats.msgOutCounter, cluster, namespace);
+
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_size", 
stats.managedLedgerStats.storageSize, cluster,
+                namespace);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_logical_size",
+                stats.managedLedgerStats.storageLogicalSize, cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_backlog_size", 
stats.managedLedgerStats.backlogSize, cluster,
+                namespace);
+        writeMetric(allMetrics, "pulsar_storage_offloaded_size",
+                stats.managedLedgerStats.offloadedStorageUsed, cluster, 
namespace);
+
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_write_rate", 
stats.managedLedgerStats.storageWriteRate,
+                cluster, namespace);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_read_rate", 
stats.managedLedgerStats.storageReadRate,
+                cluster, namespace);
+
+        writeMetric(allMetrics, "pulsar_subscription_delayed", 
stats.msgDelayed, cluster, namespace);
+
+        writePulsarMsgBacklog(allMetrics, stats.msgBacklog, cluster, 
namespace);
 
-    private static void printNamespaceStats(SimpleTextOutputStream stream, 
String cluster, String namespace,
-                                            AggregatedNamespaceStats stats) {
-        metric(stream, cluster, namespace, "pulsar_topics_count", 
stats.topicsCount);
-        metric(stream, cluster, namespace, "pulsar_subscriptions_count", 
stats.subscriptionsCount);
-        metric(stream, cluster, namespace, "pulsar_producers_count", 
stats.producersCount);
-        metric(stream, cluster, namespace, "pulsar_consumers_count", 
stats.consumersCount);
+        stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_0_5",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[0], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_1",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[1], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_5",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[2], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_10",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[3], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_20",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[4], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_50",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[5], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_100",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[6], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_200",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[7], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_1000",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[8], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_overflow",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[9], cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_count",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_sum",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), 
cluster, namespace);
 
-        metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
-        metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
-        metric(stream, cluster, namespace, "pulsar_throughput_in", 
stats.throughputIn);
-        metric(stream, cluster, namespace, "pulsar_throughput_out", 
stats.throughputOut);
-        metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", 
stats.messageAckRate);
+        stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_0_5",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[0], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_1",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[1], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_5",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[2], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_10",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[3], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_20",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[4], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_50",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[5], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_100",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[6], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_200",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[7], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_1000",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[8], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_overflow",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[9], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_count",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_sum",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), cluster, 
namespace);
 
-        metric(stream, cluster, namespace, "pulsar_in_bytes_total", 
stats.bytesInCounter);
-        metric(stream, cluster, namespace, "pulsar_in_messages_total", 
stats.msgInCounter);
-        metric(stream, cluster, namespace, "pulsar_out_bytes_total", 
stats.bytesOutCounter);
-        metric(stream, cluster, namespace, "pulsar_out_messages_total", 
stats.msgOutCounter);
+        stats.managedLedgerStats.entrySizeBuckets.refresh();
+        writeMetric(allMetrics, "pulsar_entry_size_le_128", 
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[0],
+                cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_le_512", 
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[1],
+                cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_le_1_kb", 
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[2],
+                cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_le_2_kb", 
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[3],
+                cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_le_4_kb", 
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[4],
+                cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_le_16_kb", 
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[5],
+                cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_le_100_kb",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[6], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_le_1_mb", 
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[7],
+                cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_le_overflow",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[8], 
cluster, namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_count",
+                stats.managedLedgerStats.entrySizeBuckets.getCount(), cluster, 
namespace);
+        writeMetric(allMetrics, "pulsar_entry_size_sum",
+                stats.managedLedgerStats.entrySizeBuckets.getSum(), cluster, 
namespace);
+
+        writeReplicationStat(allMetrics, "pulsar_replication_rate_in", stats,
+                replStats -> replStats.msgRateIn, cluster, namespace);
+        writeReplicationStat(allMetrics, "pulsar_replication_rate_out", stats,
+                replStats -> replStats.msgRateOut, cluster, namespace);
+        writeReplicationStat(allMetrics, "pulsar_replication_throughput_in", 
stats,
+                replStats -> replStats.msgThroughputIn, cluster, namespace);
+        writeReplicationStat(allMetrics, "pulsar_replication_throughput_out", 
stats,
+                replStats -> replStats.msgThroughputOut, cluster, namespace);
+        writeReplicationStat(allMetrics, "pulsar_replication_backlog", stats,
+                replStats -> replStats.replicationBacklog, cluster, namespace);
+        writeReplicationStat(allMetrics, "pulsar_replication_connected_count", 
stats,
+                replStats -> replStats.connectedCount, cluster, namespace);
+        writeReplicationStat(allMetrics, "pulsar_replication_rate_expired", 
stats,
+                replStats -> replStats.msgRateExpired, cluster, namespace);
+        writeReplicationStat(allMetrics, 
"pulsar_replication_delay_in_seconds", stats,
+                replStats -> replStats.replicationDelayInSeconds, cluster, 
namespace);
+    }
 
-        metric(stream, cluster, namespace, "pulsar_storage_size", 
stats.managedLedgerStats.storageSize);
-        metric(stream, cluster, namespace, "pulsar_storage_logical_size", 
stats.managedLedgerStats.storageLogicalSize);
-        metric(stream, cluster, namespace, "pulsar_storage_backlog_size", 
stats.managedLedgerStats.backlogSize);
-        metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
-                stats.managedLedgerStats.offloadedStorageUsed);
+    private static void writeMetricWithBrokerDefault(Map<String, ByteBuf> 
allMetrics, String metricName, Number value,
+                                                     String cluster, String 
namespace) {
+        ByteBuf buffer = writeGaugeTypeWithBrokerDefault(allMetrics, 
metricName, cluster);
+        writeSample(buffer, metricName, value, "cluster", cluster, 
"namespace", namespace);
+    }
 
-        metric(stream, cluster, namespace, "pulsar_storage_write_rate", 
stats.managedLedgerStats.storageWriteRate);
-        metric(stream, cluster, namespace, "pulsar_storage_read_rate", 
stats.managedLedgerStats.storageReadRate);
+    private static void writePulsarMsgBacklog(Map<String, ByteBuf> allMetrics, 
Number value,
+                                              String cluster, String 
namespace) {
+        ByteBuf buffer = writeGaugeTypeWithBrokerDefault(allMetrics, 
"pulsar_msg_backlog", cluster);
+        writeSample(buffer, "pulsar_msg_backlog", value, "cluster", cluster, 
"namespace", namespace, "remote_cluster",
+                "local");
+    }
 
-        metric(stream, cluster, namespace, "pulsar_subscription_delayed", 
stats.msgDelayed);
+    private static void writeMetric(Map<String, ByteBuf> allMetrics, String 
metricName, Number value, String cluster,
+                                    String namespace) {
+        ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+        writeSample(buffer, metricName, value, "cluster", cluster, 
"namespace", namespace);
+    }
 
-        metricWithRemoteCluster(stream, cluster, namespace, 
"pulsar_msg_backlog", "local", stats.msgBacklog);
+    private static void writeReplicationStat(Map<String, ByteBuf> allMetrics, 
String metricName,
+                                             AggregatedNamespaceStats 
namespaceStats,
+                                             
Function<AggregatedReplicationStats, Number> replStatsFunction,
+                                             String cluster, String namespace) 
{
+        if (!namespaceStats.replicationStats.isEmpty()) {
+            ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+            namespaceStats.replicationStats.forEach((remoteCluster, replStats) 
->
+                    writeSample(buffer, metricName, 
replStatsFunction.apply(replStats),
+                            "cluster", cluster,
+                            "namespace", namespace,
+                            "remote_cluster", remoteCluster)
+            );
+        }
+    }
 
-        stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
-        long[] latencyBuckets = 
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_1", latencyBuckets[1]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_5", latencyBuckets[2]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_10", latencyBuckets[3]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_20", latencyBuckets[4]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_50", latencyBuckets[5]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_100", latencyBuckets[6]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_200", latencyBuckets[7]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_overflow", latencyBuckets[9]);
-        metric(stream, cluster, namespace, 
"pulsar_storage_write_latency_count",
-                
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+    static ByteBuf writeGaugeType(Map<String, ByteBuf> allMetrics, String 
metricName) {
+        if (!allMetrics.containsKey(metricName)) {
+            ByteBuf buffer = 
UnpooledByteBufAllocator.DEFAULT.compositeDirectBuffer(MAX_COMPONENTS);

Review Comment:
   @merlimat , @tjiuming - you think we should use Direct Buffer here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to