asafm commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r938020746


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java:
##########
@@ -102,380 +104,401 @@ public void reset() {
         compactionLatencyBuckets.reset();
     }
 
-    static void resetTypes() {
-        metricWithTypeDefinition.clear();
-    }
-
-    static void printTopicStats(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-                                TopicStats stats, Optional<CompactorMXBean> 
compactorMXBean,
-                                boolean splitTopicAndPartitionIndexLabel) {
-        metric(stream, cluster, namespace, topic, 
"pulsar_subscriptions_count", stats.subscriptionsCount,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_producers_count", 
stats.producersCount,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_consumers_count", 
stats.consumersCount,
-                splitTopicAndPartitionIndexLabel);
-
-        metric(stream, cluster, namespace, topic, "pulsar_rate_in", 
stats.rateIn,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_rate_out", 
stats.rateOut,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_throughput_in", 
stats.throughputIn,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_throughput_out", 
stats.throughputOut,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", 
stats.averageMsgSize,
-                splitTopicAndPartitionIndexLabel);
-
-        metric(stream, cluster, namespace, topic, "pulsar_storage_size", 
stats.managedLedgerStats.storageSize,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_logical_size",
-                stats.managedLedgerStats.storageLogicalSize, 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", 
stats.msgBacklog,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_backlog_size",
-                stats.managedLedgerStats.backlogSize, 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_offloaded_size", stats.managedLedgerStats
-                .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_backlog_quota_limit_time",
-                stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
-
-        long[] latencyBuckets = 
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_1", latencyBuckets[1],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_5", latencyBuckets[2],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_10", latencyBuckets[3],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_20", latencyBuckets[4],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_50", latencyBuckets[5],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_100", latencyBuckets[6],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_200", latencyBuckets[7],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_le_1000", latencyBuckets[8],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_overflow", latencyBuckets[9],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_count",
-                
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), 
splitTopicAndPartitionIndexLabel);
-
-        long[] ledgerWriteLatencyBuckets = 
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_0_5",
-                ledgerWriteLatencyBuckets[0], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_1",
-                ledgerWriteLatencyBuckets[1], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_5",
-                ledgerWriteLatencyBuckets[2], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_10",
-                ledgerWriteLatencyBuckets[3], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_20",
-                ledgerWriteLatencyBuckets[4], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_50",
-                ledgerWriteLatencyBuckets[5], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_100",
-                ledgerWriteLatencyBuckets[6], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_200",
-                ledgerWriteLatencyBuckets[7], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_le_1000",
-                ledgerWriteLatencyBuckets[8], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_overflow",
-                ledgerWriteLatencyBuckets[9], 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_count",
+    public static void printTopicStats(Map<String, ByteBuf> allMetrics, 
TopicStats stats,
+                                       Optional<CompactorMXBean> 
compactorMXBean, String cluster, String namespace,
+                                       String name, boolean 
splitTopicAndPartitionIndexLabel) {
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_subscriptions_count", 
stats.subscriptionsCount,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_producers_count", 
stats.producersCount,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_consumers_count", 
stats.consumersCount,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_rate_in", 
stats.rateIn,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_rate_out", 
stats.rateOut,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_throughput_in", 
stats.throughputIn,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_throughput_out", 
stats.throughputOut,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_average_msg_size", 
stats.averageMsgSize,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_size", 
stats.managedLedgerStats.storageSize,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_logical_size",
+                stats.managedLedgerStats.storageLogicalSize, cluster, 
namespace, name,
+                splitTopicAndPartitionIndexLabel);
+        writeMetricWithBrokerDefault(allMetrics, "pulsar_msg_backlog", 
stats.msgBacklog,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_backlog_size", 
stats.managedLedgerStats.backlogSize,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_publish_rate_limit_times", 
stats.publishRateLimitedTimes,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_offloaded_size", 
stats.managedLedgerStats
+                .offloadedStorageUsed, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_backlog_quota_limit", 
stats.backlogQuotaLimit,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_backlog_quota_limit_time", 
stats.backlogQuotaLimitTime,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_0_5",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[0],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_1",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[1],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_5",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[2],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_10",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[3],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_20",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[4],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_50",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[5],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_100",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[6],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_200",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[7],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_le_1000",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[8],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_overflow",
+                
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[9],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_count",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_write_latency_sum",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), 
cluster, namespace, name,
+                splitTopicAndPartitionIndexLabel);
+
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_0_5",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[0],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_1",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[1],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_5",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[2],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_10",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[3],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_20",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[4],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_50",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[5],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_100",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[6],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_200",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[7],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_1000",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[8],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_overflow",
+                
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[9],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_count",
                 
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_storage_ledger_write_latency_sum",
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_sum",
                 
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+        writeMetric(allMetrics, "pulsar_entry_size_le_128",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[0],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_le_512",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[1],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_le_1_kb",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[2],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_le_2_kb",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[3],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_le_4_kb",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[4],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_le_16_kb",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[5],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_le_100_kb",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[6],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_le_1_mb",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[7],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_le_overflow",
+                stats.managedLedgerStats.entrySizeBuckets.getBuckets()[8],
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_count", 
stats.managedLedgerStats.entrySizeBuckets.getCount(),
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_entry_size_sum", 
stats.managedLedgerStats.entrySizeBuckets.getSum(),
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+        writeProducerStat(allMetrics, "pulsar_producer_msg_rate_in", stats,
+                p -> p.msgRateIn, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeProducerStat(allMetrics, "pulsar_producer_msg_throughput_in", 
stats,
+                p -> p.msgThroughputIn, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeProducerStat(allMetrics, "pulsar_producer_msg_average_Size", 
stats,
+                p -> p.averageMsgSize, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+
+
+        writeSubscriptionStat(allMetrics, "pulsar_subscription_back_log", 
stats, s -> s.msgBacklog,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_back_log_no_delayed",
+                stats, s -> s.msgBacklogNoDelayed, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, "pulsar_subscription_delayed",
+                stats, s -> s.msgDelayed, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_msg_rate_redeliver",
+                stats, s -> s.msgRateRedeliver, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_unacked_messages",
+                stats, s -> s.unackedMessages, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_blocked_on_unacked_messages",
+                stats, s -> s.blockedSubscriptionOnUnackedMsgs ? 1 : 0, 
cluster, namespace, name,
+                splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, "pulsar_subscription_msg_rate_out",
+                stats, s -> s.msgRateOut, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, "pulsar_subscription_msg_ack_rate",
+                stats, s -> s.messageAckRate, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_msg_throughput_out",
+                stats, s -> s.msgThroughputOut, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, "pulsar_out_bytes_total",
+                stats, s -> s.bytesOutCounter, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, "pulsar_out_messages_total",
+                stats, s -> s.msgOutCounter, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_last_expire_timestamp",
+                stats, s -> s.lastExpireTimestamp, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_last_acked_timestamp",
+                stats, s -> s.lastAckedTimestamp, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_last_consumed_flow_timestamp",
+                stats, s -> s.lastConsumedFlowTimestamp, cluster, namespace, 
name, splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_last_consumed_timestamp",
+                stats, s -> s.lastConsumedTimestamp, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_last_mark_delete_advanced_timestamp",
+                stats, s -> s.lastMarkDeleteAdvancedTimestamp, cluster, 
namespace, name,
+                splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_msg_rate_expired",
+                stats, s -> s.msgRateExpired, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_total_msg_expired",
+                stats, s -> s.totalMsgExpired, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, "pulsar_subscription_msg_drop_rate",
+                stats, s -> s.msgDropRate, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeSubscriptionStat(allMetrics, 
"pulsar_subscription_consumers_count",
+                stats, s -> s.consumersCount, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+
+        writeConsumerStat(allMetrics, "pulsar_consumer_msg_rate_redeliver", 
stats, c -> c.msgRateRedeliver,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeConsumerStat(allMetrics, "pulsar_consumer_unacked_messages", 
stats, c -> c.unackedMessages,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeConsumerStat(allMetrics, 
"pulsar_consumer_blocked_on_unacked_messages",
+                stats, c -> c.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeConsumerStat(allMetrics, "pulsar_consumer_msg_rate_out", stats, c 
-> c.msgRateOut,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+        writeConsumerStat(allMetrics, "pulsar_consumer_msg_ack_rate", stats, c 
-> c.msgAckRate,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+        writeConsumerStat(allMetrics, "pulsar_consumer_msg_throughput_out", 
stats, c -> c.msgThroughputOut,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeConsumerStat(allMetrics, "pulsar_consumer_available_permits", 
stats, c -> c.availablePermits,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeConsumerStat(allMetrics, "pulsar_out_bytes_total", stats, c -> 
c.bytesOutCounter,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeConsumerStat(allMetrics, "pulsar_out_messages_total", stats, c -> 
c.msgOutCounter,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+        writeReplicationStat(allMetrics, "pulsar_replication_rate_in", stats, 
r -> r.msgRateIn,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeReplicationStat(allMetrics, "pulsar_replication_rate_out", stats, 
r -> r.msgRateOut,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeReplicationStat(allMetrics, "pulsar_replication_throughput_in", 
stats, r -> r.msgThroughputIn,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeReplicationStat(allMetrics, "pulsar_replication_throughput_out", 
stats, r -> r.msgThroughputOut,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeReplicationStat(allMetrics, "pulsar_replication_backlog", stats, 
r -> r.replicationBacklog,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeReplicationStat(allMetrics, "pulsar_replication_connected_count", 
stats, r -> r.connectedCount,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeReplicationStat(allMetrics, "pulsar_replication_rate_expired", 
stats, r -> r.msgRateExpired,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeReplicationStat(allMetrics, 
"pulsar_replication_delay_in_seconds", stats,
+                r -> r.replicationDelayInSeconds, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+
+        writeMetric(allMetrics, "pulsar_in_bytes_total", stats.bytesInCounter, 
cluster, namespace, name,
+                splitTopicAndPartitionIndexLabel);
+        writeMetric(allMetrics, "pulsar_in_messages_total", 
stats.msgInCounter, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
 
-        long[] entrySizeBuckets = 
stats.managedLedgerStats.entrySizeBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", 
entrySizeBuckets[0],
+        // Compaction
+
+        writeCompactionStat(allMetrics, 
"pulsar_compaction_removed_event_count", stats.compactionRemovedEventCount,
+                compactorMXBean, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeCompactionStat(allMetrics, "pulsar_compaction_succeed_count", 
stats.compactionSucceedCount,
+                compactorMXBean, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeCompactionStat(allMetrics, "pulsar_compaction_failed_count", 
stats.compactionFailedCount, compactorMXBean,
+                cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+        writeCompactionStat(allMetrics, 
"pulsar_compaction_duration_time_in_mills", stats.compactionDurationTimeInMills,
+                compactorMXBean, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeCompactionStat(allMetrics, "pulsar_compaction_read_throughput", 
stats.compactionReadThroughput,
+                compactorMXBean, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeCompactionStat(allMetrics, "pulsar_compaction_write_throughput", 
stats.compactionWriteThroughput,
+                compactorMXBean, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeCompactionStat(allMetrics, 
"pulsar_compaction_compacted_entries_count",
+                stats.compactionCompactedEntriesCount, compactorMXBean, 
cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", 
entrySizeBuckets[1],
+        writeCompactionStat(allMetrics, 
"pulsar_compaction_compacted_entries_size",
+                stats.compactionCompactedEntriesSize, compactorMXBean, 
cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", 
entrySizeBuckets[2],
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_0_5",
+                stats.compactionLatencyBuckets.getBuckets()[0], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", 
entrySizeBuckets[3],
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_1",
+                stats.compactionLatencyBuckets.getBuckets()[1], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", 
entrySizeBuckets[4],
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_5",
+                stats.compactionLatencyBuckets.getBuckets()[2], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_10",
+                stats.compactionLatencyBuckets.getBuckets()[3], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_20",
+                stats.compactionLatencyBuckets.getBuckets()[4], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", 
entrySizeBuckets[7],
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_50",
+                stats.compactionLatencyBuckets.getBuckets()[5], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, 
"pulsar_entry_size_le_overflow", entrySizeBuckets[8],
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_100",
+                stats.compactionLatencyBuckets.getBuckets()[6], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
-                stats.managedLedgerStats.entrySizeBuckets.getCount(), 
splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
-                stats.managedLedgerStats.entrySizeBuckets.getSum(), 
splitTopicAndPartitionIndexLabel);
-
-        stats.producerStats.forEach((p, producerStats) -> {
-            metric(stream, cluster, namespace, topic, p, 
producerStats.producerId, "pulsar_producer_msg_rate_in",
-                    producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, p, 
producerStats.producerId, "pulsar_producer_msg_throughput_in",
-                    producerStats.msgThroughputIn, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, p, 
producerStats.producerId, "pulsar_producer_msg_average_Size",
-                    producerStats.averageMsgSize, 
splitTopicAndPartitionIndexLabel);
-        });
-
-        stats.subscriptionStats.forEach((n, subsStats) -> {
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_back_log",
-                    subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_back_log_no_delayed",
-                    subsStats.msgBacklogNoDelayed, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_delayed",
-                    subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_rate_redeliver",
-                    subsStats.msgRateRedeliver, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_unacked_messages",
-                    subsStats.unackedMessages, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_blocked_on_unacked_messages",
-                    subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_rate_out",
-                    subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_ack_rate",
-                    subsStats.messageAckRate, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_throughput_out",
-                    subsStats.msgThroughputOut, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_out_bytes_total",
-                    subsStats.bytesOutCounter, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_out_messages_total",
-                    subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_last_expire_timestamp",
-                    subsStats.lastExpireTimestamp, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_last_acked_timestamp",
-                subsStats.lastAckedTimestamp, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_last_consumed_flow_timestamp",
-                subsStats.lastConsumedFlowTimestamp, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_last_consumed_timestamp",
-                subsStats.lastConsumedTimestamp, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_last_mark_delete_advanced_timestamp",
-                subsStats.lastMarkDeleteAdvancedTimestamp, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_rate_expired",
-                    subsStats.msgRateExpired, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_total_msg_expired",
-                    subsStats.totalMsgExpired, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_msg_drop_rate",
-                    subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, 
"pulsar_subscription_consumers_count",
-                    subsStats.consumersCount, 
splitTopicAndPartitionIndexLabel);
-            subsStats.consumerStat.forEach((c, consumerStats) -> {
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_consumer_msg_rate_redeliver", 
consumerStats.msgRateRedeliver,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_consumer_unacked_messages", 
consumerStats.unackedMessages,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_consumer_blocked_on_unacked_messages",
-                        consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_consumer_msg_rate_out", 
consumerStats.msgRateOut,
-                        splitTopicAndPartitionIndexLabel);
-
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_consumer_msg_ack_rate", 
consumerStats.msgAckRate,
-                        splitTopicAndPartitionIndexLabel);
-
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_consumer_msg_throughput_out", 
consumerStats.msgThroughputOut,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_consumer_available_permits", 
consumerStats.availablePermits,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_out_bytes_total", 
consumerStats.bytesOutCounter,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), 
c.consumerId(),
-                        "pulsar_out_messages_total", 
consumerStats.msgOutCounter,
-                        splitTopicAndPartitionIndexLabel);
-            });
-        });
-
-        if (!stats.replicationStats.isEmpty()) {
-            stats.replicationStats.forEach((remoteCluster, replStats) -> {
-                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_rate_in", remoteCluster,
-                        replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_rate_out", remoteCluster,
-                        replStats.msgRateOut, 
splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_throughput_in",
-                        remoteCluster, replStats.msgThroughputIn, 
splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_throughput_out",
-                        remoteCluster, replStats.msgThroughputOut, 
splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_backlog", remoteCluster,
-                        replStats.replicationBacklog, 
splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_connected_count",
-                        remoteCluster, replStats.connectedCount, 
splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_rate_expired",
-                        remoteCluster, replStats.msgRateExpired, 
splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, 
"pulsar_replication_delay_in_seconds",
-                        remoteCluster, replStats.replicationDelayInSeconds, 
splitTopicAndPartitionIndexLabel);
-            });
-        }
-
-        metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", 
stats.bytesInCounter,
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_200",
+                stats.compactionLatencyBuckets.getBuckets()[7], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", 
stats.msgInCounter,
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_1000",
+                stats.compactionLatencyBuckets.getBuckets()[8], 
compactorMXBean, cluster, namespace, name,
                 splitTopicAndPartitionIndexLabel);
-
-        // Compaction
-        boolean hasCompaction = compactorMXBean.flatMap(mxBean -> 
mxBean.getCompactionRecordForTopic(topic))
-                .map(__ -> true).orElse(false);
-        if (hasCompaction) {
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_removed_event_count",
-                    stats.compactionRemovedEventCount, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_succeed_count",
-                    stats.compactionSucceedCount, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_failed_count",
-                    stats.compactionFailedCount, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_duration_time_in_mills",
-                    stats.compactionDurationTimeInMills, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_read_throughput",
-                    stats.compactionReadThroughput, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_write_throughput",
-                    stats.compactionWriteThroughput, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_compacted_entries_count",
-                    stats.compactionCompactedEntriesCount, 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_compacted_entries_size",
-                    stats.compactionCompactedEntriesSize, 
splitTopicAndPartitionIndexLabel);
-            long[] compactionLatencyBuckets = 
stats.compactionLatencyBuckets.getBuckets();
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_0_5",
-                    compactionLatencyBuckets[0], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_1",
-                    compactionLatencyBuckets[1], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_5",
-                    compactionLatencyBuckets[2], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_10",
-                    compactionLatencyBuckets[3], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_20",
-                    compactionLatencyBuckets[4], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_50",
-                    compactionLatencyBuckets[5], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_100",
-                    compactionLatencyBuckets[6], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_200",
-                    compactionLatencyBuckets[7], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_le_1000",
-                    compactionLatencyBuckets[8], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_overflow",
-                    compactionLatencyBuckets[9], 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_sum",
-                    stats.compactionLatencyBuckets.getSum(), 
splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, 
"pulsar_compaction_latency_count",
-                    stats.compactionLatencyBuckets.getCount(), 
splitTopicAndPartitionIndexLabel);
-        }
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_overflow",
+                stats.compactionLatencyBuckets.getBuckets()[9], 
compactorMXBean, cluster, namespace, name,
+                splitTopicAndPartitionIndexLabel);
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_sum", 
stats.compactionLatencyBuckets.getSum(),
+                compactorMXBean, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
+        writeCompactionStat(allMetrics, "pulsar_compaction_latency_count", 
stats.compactionLatencyBuckets.getCount(),
+                compactorMXBean, cluster, namespace, name, 
splitTopicAndPartitionIndexLabel);
     }
 
-    static void metricType(SimpleTextOutputStream stream, String name) {
-
-        if (!metricWithTypeDefinition.containsKey(name)) {
-            metricWithTypeDefinition.put(name, "gauge");
-            stream.write("# TYPE ").write(name).write(" gauge\n");
-        }
-
+    private static void writeMetric(Map<String, ByteBuf> allMetrics, String 
metricName, Number value, String cluster,
+                                    String namespace, String topic, boolean 
splitTopicAndPartitionIndexLabel) {
+        ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+        writeTopicSample(buffer, metricName, value, cluster, namespace, topic, 
splitTopicAndPartitionIndexLabel);
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String name, double value, boolean 
splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeProducerStat(Map<String, ByteBuf> allMetrics, 
String metricName, TopicStats topicStats,
+                                          Function<AggregatedProducerStats, 
Number> valueFunction,
+                                          String cluster, String namespace, 
String topic,
+                                          boolean 
splitTopicAndPartitionIndexLabel) {
+        ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+        topicStats.producerStats.forEach((p, producerStats) ->
+                writeTopicSample(buffer, metricName, 
valueFunction.apply(producerStats), cluster, namespace, topic,
+                        splitTopicAndPartitionIndexLabel, "producer_name", p, 
"producer_id",
+                        String.valueOf(producerStats.producerId)));
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-           String subscription, String name, long value, boolean 
splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeSubscriptionStat(Map<String, ByteBuf> allMetrics, 
String metricName, TopicStats topicStats,
+                                              
Function<AggregatedSubscriptionStats, Number> valueFunction,
+                                              String cluster, String 
namespace, String topic,
+                                              boolean 
splitTopicAndPartitionIndexLabel) {
+        ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+        topicStats.subscriptionStats.forEach((s, subStats) ->
+                writeTopicSample(buffer, metricName, 
valueFunction.apply(subStats), cluster, namespace, topic,
+                        splitTopicAndPartitionIndexLabel, "subscription", s));
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String producerName, long produceId, String name, double value, 
boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",producer_name=\"").write(producerName)
-                .write("\",producer_id=\"").write(produceId).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeConsumerStat(Map<String, ByteBuf> allMetrics, 
String metricName, TopicStats topicStats,
+                                          Function<AggregatedConsumerStats, 
Number> valueFunction,
+                                          String cluster, String namespace, 
String topic,
+                                          boolean 
splitTopicAndPartitionIndexLabel) {
+        ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+        topicStats.subscriptionStats.forEach((s, subStats) ->
+                subStats.consumerStat.forEach((c, conStats) ->
+                        writeTopicSample(buffer, metricName, 
valueFunction.apply(conStats), cluster, namespace, topic,
+                                splitTopicAndPartitionIndexLabel, 
"subscription", s, "consumer_name", c.consumerName(),
+                                "consumer_id", String.valueOf(c.consumerId()))
+                ));
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String subscription, String name, double value, boolean 
splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
-    }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String subscription, String consumerName, long consumerId, String 
name, long value,
-            boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription)
-                
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
-                .write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeReplicationStat(Map<String, ByteBuf> allMetrics, 
String metricName, TopicStats topicStats,
+                                             
Function<AggregatedReplicationStats, Number> valueFunction,
+                                             String cluster, String namespace, 
String topic,
+                                             boolean 
splitTopicAndPartitionIndexLabel) {
+        if (!topicStats.replicationStats.isEmpty()) {
+            ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+            topicStats.replicationStats.forEach((remoteCluster, replStats) ->
+                    writeTopicSample(buffer, metricName, 
valueFunction.apply(replStats), cluster, namespace, topic,
+                            splitTopicAndPartitionIndexLabel, 
"remote_cluster", remoteCluster)
+            );
+        }
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, 
String namespace, String topic,
-            String subscription, String consumerName, long consumerId, String 
name, double value,
-            boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription)
-                
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
-                .write(consumerId).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeCompactionStat(Map<String, ByteBuf> allMetrics, 
String metricName,
+                                            Number value, 
Optional<CompactorMXBean> compactorMXBean,
+                                            String cluster, String namespace, 
String topic,
+                                            boolean 
splitTopicAndPartitionIndexLabel) {
+        boolean hasCompaction = compactorMXBean.flatMap(mxBean -> 
mxBean.getCompactionRecordForTopic(topic))
+                .isPresent();
+        if (hasCompaction) {
+            ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+            writeTopicSample(buffer, metricName, value, cluster, namespace, 
topic, splitTopicAndPartitionIndexLabel);
+        }
     }
 
-    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, 
String cluster, String namespace,
-            String topic, String name, String remoteCluster, double value, 
boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, 
splitTopicAndPartitionIndexLabel)
-                .write("\",remote_cluster=\"").write(remoteCluster).write("\"} 
");
-        stream.write(value);
-        appendEndings(stream);
+    static void writeMetricWithBrokerDefault(Map<String, ByteBuf> allMetrics, 
String metricName, Number value,
+                                             String cluster, String namespace, 
String topic,
+                                             boolean 
splitTopicAndPartitionIndexLabel) {
+        ByteBuf buffer = writeGaugeTypeWithBrokerDefault(allMetrics, 
metricName, cluster);
+        writeTopicSample(buffer, metricName, value, cluster, namespace, topic, 
splitTopicAndPartitionIndexLabel);
     }
 
-    private static SimpleTextOutputStream 
appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
-            String namespace, String topic, String name, boolean 
splitTopicAndPartitionIndexLabel) {
-        
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+    static void writeTopicSample(ByteBuf buffer, String metricName, Number 
value, String cluster,
+                                 String namespace, String topic, boolean 
splitTopicAndPartitionIndexLabel,
+                                 String... extraLabelsAndValues) {

Review Comment:
   @codelipenghui is ok with this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to