asafm commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r938020746
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java:
##########
@@ -102,380 +104,401 @@ public void reset() {
compactionLatencyBuckets.reset();
}
- static void resetTypes() {
- metricWithTypeDefinition.clear();
- }
-
- static void printTopicStats(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- TopicStats stats, Optional<CompactorMXBean>
compactorMXBean,
- boolean splitTopicAndPartitionIndexLabel) {
- metric(stream, cluster, namespace, topic,
"pulsar_subscriptions_count", stats.subscriptionsCount,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_producers_count",
stats.producersCount,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_consumers_count",
stats.consumersCount,
- splitTopicAndPartitionIndexLabel);
-
- metric(stream, cluster, namespace, topic, "pulsar_rate_in",
stats.rateIn,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_rate_out",
stats.rateOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_throughput_in",
stats.throughputIn,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_throughput_out",
stats.throughputOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_average_msg_size",
stats.averageMsgSize,
- splitTopicAndPartitionIndexLabel);
-
- metric(stream, cluster, namespace, topic, "pulsar_storage_size",
stats.managedLedgerStats.storageSize,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_logical_size",
- stats.managedLedgerStats.storageLogicalSize,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_msg_backlog",
stats.msgBacklog,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_size",
- stats.managedLedgerStats.backlogSize,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_offloaded_size", stats.managedLedgerStats
- .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_quota_limit_time",
- stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
-
- long[] latencyBuckets =
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_1", latencyBuckets[1],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_5", latencyBuckets[2],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_10", latencyBuckets[3],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_20", latencyBuckets[4],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_50", latencyBuckets[5],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_100", latencyBuckets[6],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_200", latencyBuckets[7],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_1000", latencyBuckets[8],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_overflow", latencyBuckets[9],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_count",
-
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(),
splitTopicAndPartitionIndexLabel);
-
- long[] ledgerWriteLatencyBuckets =
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_0_5",
- ledgerWriteLatencyBuckets[0],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_1",
- ledgerWriteLatencyBuckets[1],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_5",
- ledgerWriteLatencyBuckets[2],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_10",
- ledgerWriteLatencyBuckets[3],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_20",
- ledgerWriteLatencyBuckets[4],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_50",
- ledgerWriteLatencyBuckets[5],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_100",
- ledgerWriteLatencyBuckets[6],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_200",
- ledgerWriteLatencyBuckets[7],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_1000",
- ledgerWriteLatencyBuckets[8],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_overflow",
- ledgerWriteLatencyBuckets[9],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_count",
+ public static void printTopicStats(Map<String, ByteBuf> allMetrics,
TopicStats stats,
+ Optional<CompactorMXBean>
compactorMXBean, String cluster, String namespace,
+ String name, boolean
splitTopicAndPartitionIndexLabel) {
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_subscriptions_count",
stats.subscriptionsCount,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_producers_count",
stats.producersCount,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_consumers_count",
stats.consumersCount,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_rate_in",
stats.rateIn,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_rate_out",
stats.rateOut,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_throughput_in",
stats.throughputIn,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_throughput_out",
stats.throughputOut,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_average_msg_size",
stats.averageMsgSize,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_size",
stats.managedLedgerStats.storageSize,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_logical_size",
+ stats.managedLedgerStats.storageLogicalSize, cluster,
namespace, name,
+ splitTopicAndPartitionIndexLabel);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_msg_backlog",
stats.msgBacklog,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_publish_rate_limit_times",
stats.publishRateLimitedTimes,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_offloaded_size",
stats.managedLedgerStats
+ .offloadedStorageUsed, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_backlog_quota_limit",
stats.backlogQuotaLimit,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_backlog_quota_limit_time",
stats.backlogQuotaLimitTime,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_0_5",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[0],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_1",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[1],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_5",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[2],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_10",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[3],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_20",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[4],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_50",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[5],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_100",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[6],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_200",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[7],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_1000",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[8],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_overflow",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[9],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_count",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_sum",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(),
cluster, namespace, name,
+ splitTopicAndPartitionIndexLabel);
+
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_0_5",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[0],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_1",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[1],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_5",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[2],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_10",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[3],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_20",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[4],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_50",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[5],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_100",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[6],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_200",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[7],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_1000",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[8],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_overflow",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[9],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_count",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_sum",
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_sum",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+ writeMetric(allMetrics, "pulsar_entry_size_le_128",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[0],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_le_512",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[1],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_le_1_kb",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[2],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_le_2_kb",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[3],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_le_4_kb",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[4],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_le_16_kb",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[5],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_le_100_kb",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[6],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_le_1_mb",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[7],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_le_overflow",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[8],
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_count",
stats.managedLedgerStats.entrySizeBuckets.getCount(),
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_entry_size_sum",
stats.managedLedgerStats.entrySizeBuckets.getSum(),
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+ writeProducerStat(allMetrics, "pulsar_producer_msg_rate_in", stats,
+ p -> p.msgRateIn, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeProducerStat(allMetrics, "pulsar_producer_msg_throughput_in",
stats,
+ p -> p.msgThroughputIn, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeProducerStat(allMetrics, "pulsar_producer_msg_average_Size",
stats,
+ p -> p.averageMsgSize, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+
+
+ writeSubscriptionStat(allMetrics, "pulsar_subscription_back_log",
stats, s -> s.msgBacklog,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_back_log_no_delayed",
+ stats, s -> s.msgBacklogNoDelayed, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics, "pulsar_subscription_delayed",
+ stats, s -> s.msgDelayed, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_msg_rate_redeliver",
+ stats, s -> s.msgRateRedeliver, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_unacked_messages",
+ stats, s -> s.unackedMessages, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_blocked_on_unacked_messages",
+ stats, s -> s.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
cluster, namespace, name,
+ splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics, "pulsar_subscription_msg_rate_out",
+ stats, s -> s.msgRateOut, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics, "pulsar_subscription_msg_ack_rate",
+ stats, s -> s.messageAckRate, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_msg_throughput_out",
+ stats, s -> s.msgThroughputOut, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics, "pulsar_out_bytes_total",
+ stats, s -> s.bytesOutCounter, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics, "pulsar_out_messages_total",
+ stats, s -> s.msgOutCounter, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_last_expire_timestamp",
+ stats, s -> s.lastExpireTimestamp, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_last_acked_timestamp",
+ stats, s -> s.lastAckedTimestamp, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_last_consumed_flow_timestamp",
+ stats, s -> s.lastConsumedFlowTimestamp, cluster, namespace,
name, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_last_consumed_timestamp",
+ stats, s -> s.lastConsumedTimestamp, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_last_mark_delete_advanced_timestamp",
+ stats, s -> s.lastMarkDeleteAdvancedTimestamp, cluster,
namespace, name,
+ splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_msg_rate_expired",
+ stats, s -> s.msgRateExpired, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_total_msg_expired",
+ stats, s -> s.totalMsgExpired, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics, "pulsar_subscription_msg_drop_rate",
+ stats, s -> s.msgDropRate, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionStat(allMetrics,
"pulsar_subscription_consumers_count",
+ stats, s -> s.consumersCount, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+
+ writeConsumerStat(allMetrics, "pulsar_consumer_msg_rate_redeliver",
stats, c -> c.msgRateRedeliver,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeConsumerStat(allMetrics, "pulsar_consumer_unacked_messages",
stats, c -> c.unackedMessages,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeConsumerStat(allMetrics,
"pulsar_consumer_blocked_on_unacked_messages",
+ stats, c -> c.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeConsumerStat(allMetrics, "pulsar_consumer_msg_rate_out", stats, c
-> c.msgRateOut,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+ writeConsumerStat(allMetrics, "pulsar_consumer_msg_ack_rate", stats, c
-> c.msgAckRate,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+ writeConsumerStat(allMetrics, "pulsar_consumer_msg_throughput_out",
stats, c -> c.msgThroughputOut,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeConsumerStat(allMetrics, "pulsar_consumer_available_permits",
stats, c -> c.availablePermits,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeConsumerStat(allMetrics, "pulsar_out_bytes_total", stats, c ->
c.bytesOutCounter,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeConsumerStat(allMetrics, "pulsar_out_messages_total", stats, c ->
c.msgOutCounter,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+
+ writeReplicationStat(allMetrics, "pulsar_replication_rate_in", stats,
r -> r.msgRateIn,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeReplicationStat(allMetrics, "pulsar_replication_rate_out", stats,
r -> r.msgRateOut,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeReplicationStat(allMetrics, "pulsar_replication_throughput_in",
stats, r -> r.msgThroughputIn,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeReplicationStat(allMetrics, "pulsar_replication_throughput_out",
stats, r -> r.msgThroughputOut,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeReplicationStat(allMetrics, "pulsar_replication_backlog", stats,
r -> r.replicationBacklog,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeReplicationStat(allMetrics, "pulsar_replication_connected_count",
stats, r -> r.connectedCount,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeReplicationStat(allMetrics, "pulsar_replication_rate_expired",
stats, r -> r.msgRateExpired,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeReplicationStat(allMetrics,
"pulsar_replication_delay_in_seconds", stats,
+ r -> r.replicationDelayInSeconds, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+
+ writeMetric(allMetrics, "pulsar_in_bytes_total", stats.bytesInCounter,
cluster, namespace, name,
+ splitTopicAndPartitionIndexLabel);
+ writeMetric(allMetrics, "pulsar_in_messages_total",
stats.msgInCounter, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- long[] entrySizeBuckets =
stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128",
entrySizeBuckets[0],
+ // Compaction
+
+ writeCompactionStat(allMetrics,
"pulsar_compaction_removed_event_count", stats.compactionRemovedEventCount,
+ compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeCompactionStat(allMetrics, "pulsar_compaction_succeed_count",
stats.compactionSucceedCount,
+ compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeCompactionStat(allMetrics, "pulsar_compaction_failed_count",
stats.compactionFailedCount, compactorMXBean,
+ cluster, namespace, name, splitTopicAndPartitionIndexLabel);
+ writeCompactionStat(allMetrics,
"pulsar_compaction_duration_time_in_mills", stats.compactionDurationTimeInMills,
+ compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeCompactionStat(allMetrics, "pulsar_compaction_read_throughput",
stats.compactionReadThroughput,
+ compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeCompactionStat(allMetrics, "pulsar_compaction_write_throughput",
stats.compactionWriteThroughput,
+ compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeCompactionStat(allMetrics,
"pulsar_compaction_compacted_entries_count",
+ stats.compactionCompactedEntriesCount, compactorMXBean,
cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512",
entrySizeBuckets[1],
+ writeCompactionStat(allMetrics,
"pulsar_compaction_compacted_entries_size",
+ stats.compactionCompactedEntriesSize, compactorMXBean,
cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb",
entrySizeBuckets[2],
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_0_5",
+ stats.compactionLatencyBuckets.getBuckets()[0],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb",
entrySizeBuckets[3],
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_1",
+ stats.compactionLatencyBuckets.getBuckets()[1],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb",
entrySizeBuckets[4],
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_5",
+ stats.compactionLatencyBuckets.getBuckets()[2],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_10",
+ stats.compactionLatencyBuckets.getBuckets()[3],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_20",
+ stats.compactionLatencyBuckets.getBuckets()[4],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb",
entrySizeBuckets[7],
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_50",
+ stats.compactionLatencyBuckets.getBuckets()[5],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_entry_size_le_overflow", entrySizeBuckets[8],
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_100",
+ stats.compactionLatencyBuckets.getBuckets()[6],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
- stats.managedLedgerStats.entrySizeBuckets.getCount(),
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
- stats.managedLedgerStats.entrySizeBuckets.getSum(),
splitTopicAndPartitionIndexLabel);
-
- stats.producerStats.forEach((p, producerStats) -> {
- metric(stream, cluster, namespace, topic, p,
producerStats.producerId, "pulsar_producer_msg_rate_in",
- producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, p,
producerStats.producerId, "pulsar_producer_msg_throughput_in",
- producerStats.msgThroughputIn,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, p,
producerStats.producerId, "pulsar_producer_msg_average_Size",
- producerStats.averageMsgSize,
splitTopicAndPartitionIndexLabel);
- });
-
- stats.subscriptionStats.forEach((n, subsStats) -> {
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_back_log",
- subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_back_log_no_delayed",
- subsStats.msgBacklogNoDelayed,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_delayed",
- subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_rate_redeliver",
- subsStats.msgRateRedeliver,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_unacked_messages",
- subsStats.unackedMessages,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_blocked_on_unacked_messages",
- subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_rate_out",
- subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_ack_rate",
- subsStats.messageAckRate,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_throughput_out",
- subsStats.msgThroughputOut,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_out_bytes_total",
- subsStats.bytesOutCounter,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_out_messages_total",
- subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_expire_timestamp",
- subsStats.lastExpireTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_acked_timestamp",
- subsStats.lastAckedTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_consumed_flow_timestamp",
- subsStats.lastConsumedFlowTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_consumed_timestamp",
- subsStats.lastConsumedTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_mark_delete_advanced_timestamp",
- subsStats.lastMarkDeleteAdvancedTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_rate_expired",
- subsStats.msgRateExpired,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_total_msg_expired",
- subsStats.totalMsgExpired,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_drop_rate",
- subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_consumers_count",
- subsStats.consumersCount,
splitTopicAndPartitionIndexLabel);
- subsStats.consumerStat.forEach((c, consumerStats) -> {
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_msg_rate_redeliver",
consumerStats.msgRateRedeliver,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_unacked_messages",
consumerStats.unackedMessages,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_blocked_on_unacked_messages",
- consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_msg_rate_out",
consumerStats.msgRateOut,
- splitTopicAndPartitionIndexLabel);
-
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_msg_ack_rate",
consumerStats.msgAckRate,
- splitTopicAndPartitionIndexLabel);
-
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_msg_throughput_out",
consumerStats.msgThroughputOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_available_permits",
consumerStats.availablePermits,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_out_bytes_total",
consumerStats.bytesOutCounter,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_out_messages_total",
consumerStats.msgOutCounter,
- splitTopicAndPartitionIndexLabel);
- });
- });
-
- if (!stats.replicationStats.isEmpty()) {
- stats.replicationStats.forEach((remoteCluster, replStats) -> {
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_rate_in", remoteCluster,
- replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_rate_out", remoteCluster,
- replStats.msgRateOut,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_throughput_in",
- remoteCluster, replStats.msgThroughputIn,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_throughput_out",
- remoteCluster, replStats.msgThroughputOut,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_backlog", remoteCluster,
- replStats.replicationBacklog,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_connected_count",
- remoteCluster, replStats.connectedCount,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_rate_expired",
- remoteCluster, replStats.msgRateExpired,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_delay_in_seconds",
- remoteCluster, replStats.replicationDelayInSeconds,
splitTopicAndPartitionIndexLabel);
- });
- }
-
- metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total",
stats.bytesInCounter,
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_200",
+ stats.compactionLatencyBuckets.getBuckets()[7],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_in_messages_total",
stats.msgInCounter,
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_le_1000",
+ stats.compactionLatencyBuckets.getBuckets()[8],
compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
-
- // Compaction
- boolean hasCompaction = compactorMXBean.flatMap(mxBean ->
mxBean.getCompactionRecordForTopic(topic))
- .map(__ -> true).orElse(false);
- if (hasCompaction) {
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_removed_event_count",
- stats.compactionRemovedEventCount,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_succeed_count",
- stats.compactionSucceedCount,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_failed_count",
- stats.compactionFailedCount,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_duration_time_in_mills",
- stats.compactionDurationTimeInMills,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_read_throughput",
- stats.compactionReadThroughput,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_write_throughput",
- stats.compactionWriteThroughput,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_compacted_entries_count",
- stats.compactionCompactedEntriesCount,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_compacted_entries_size",
- stats.compactionCompactedEntriesSize,
splitTopicAndPartitionIndexLabel);
- long[] compactionLatencyBuckets =
stats.compactionLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_0_5",
- compactionLatencyBuckets[0],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_1",
- compactionLatencyBuckets[1],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_5",
- compactionLatencyBuckets[2],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_10",
- compactionLatencyBuckets[3],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_20",
- compactionLatencyBuckets[4],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_50",
- compactionLatencyBuckets[5],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_100",
- compactionLatencyBuckets[6],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_200",
- compactionLatencyBuckets[7],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_1000",
- compactionLatencyBuckets[8],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_overflow",
- compactionLatencyBuckets[9],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_sum",
- stats.compactionLatencyBuckets.getSum(),
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_count",
- stats.compactionLatencyBuckets.getCount(),
splitTopicAndPartitionIndexLabel);
- }
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_overflow",
+ stats.compactionLatencyBuckets.getBuckets()[9],
compactorMXBean, cluster, namespace, name,
+ splitTopicAndPartitionIndexLabel);
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_sum",
stats.compactionLatencyBuckets.getSum(),
+ compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
+ writeCompactionStat(allMetrics, "pulsar_compaction_latency_count",
stats.compactionLatencyBuckets.getCount(),
+ compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
}
- static void metricType(SimpleTextOutputStream stream, String name) {
-
- if (!metricWithTypeDefinition.containsKey(name)) {
- metricWithTypeDefinition.put(name, "gauge");
- stream.write("# TYPE ").write(name).write(" gauge\n");
- }
-
+ private static void writeMetric(Map<String, ByteBuf> allMetrics, String
metricName, Number value, String cluster,
+ String namespace, String topic, boolean
splitTopicAndPartitionIndexLabel) {
+ ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+ writeTopicSample(buffer, metricName, value, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String name, double value, boolean
splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeProducerStat(Map<String, ByteBuf> allMetrics,
String metricName, TopicStats topicStats,
+ Function<AggregatedProducerStats,
Number> valueFunction,
+ String cluster, String namespace,
String topic,
+ boolean
splitTopicAndPartitionIndexLabel) {
+ ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+ topicStats.producerStats.forEach((p, producerStats) ->
+ writeTopicSample(buffer, metricName,
valueFunction.apply(producerStats), cluster, namespace, topic,
+ splitTopicAndPartitionIndexLabel, "producer_name", p,
"producer_id",
+ String.valueOf(producerStats.producerId)));
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String subscription, String name, long value, boolean
splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeSubscriptionStat(Map<String, ByteBuf> allMetrics,
String metricName, TopicStats topicStats,
+
Function<AggregatedSubscriptionStats, Number> valueFunction,
+ String cluster, String
namespace, String topic,
+ boolean
splitTopicAndPartitionIndexLabel) {
+ ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+ topicStats.subscriptionStats.forEach((s, subStats) ->
+ writeTopicSample(buffer, metricName,
valueFunction.apply(subStats), cluster, namespace, topic,
+ splitTopicAndPartitionIndexLabel, "subscription", s));
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String producerName, long produceId, String name, double value,
boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",producer_name=\"").write(producerName)
- .write("\",producer_id=\"").write(produceId).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeConsumerStat(Map<String, ByteBuf> allMetrics,
String metricName, TopicStats topicStats,
+ Function<AggregatedConsumerStats,
Number> valueFunction,
+ String cluster, String namespace,
String topic,
+ boolean
splitTopicAndPartitionIndexLabel) {
+ ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+ topicStats.subscriptionStats.forEach((s, subStats) ->
+ subStats.consumerStat.forEach((c, conStats) ->
+ writeTopicSample(buffer, metricName,
valueFunction.apply(conStats), cluster, namespace, topic,
+ splitTopicAndPartitionIndexLabel,
"subscription", s, "consumer_name", c.consumerName(),
+ "consumer_id", String.valueOf(c.consumerId()))
+ ));
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String subscription, String name, double value, boolean
splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value);
- appendEndings(stream);
- }
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String subscription, String consumerName, long consumerId, String
name, long value,
- boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription)
-
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
- .write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeReplicationStat(Map<String, ByteBuf> allMetrics,
String metricName, TopicStats topicStats,
+
Function<AggregatedReplicationStats, Number> valueFunction,
+ String cluster, String namespace,
String topic,
+ boolean
splitTopicAndPartitionIndexLabel) {
+ if (!topicStats.replicationStats.isEmpty()) {
+ ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+ topicStats.replicationStats.forEach((remoteCluster, replStats) ->
+ writeTopicSample(buffer, metricName,
valueFunction.apply(replStats), cluster, namespace, topic,
+ splitTopicAndPartitionIndexLabel,
"remote_cluster", remoteCluster)
+ );
+ }
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String subscription, String consumerName, long consumerId, String
name, double value,
- boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription)
-
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
- .write(consumerId).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeCompactionStat(Map<String, ByteBuf> allMetrics,
String metricName,
+ Number value,
Optional<CompactorMXBean> compactorMXBean,
+ String cluster, String namespace,
String topic,
+ boolean
splitTopicAndPartitionIndexLabel) {
+ boolean hasCompaction = compactorMXBean.flatMap(mxBean ->
mxBean.getCompactionRecordForTopic(topic))
+ .isPresent();
+ if (hasCompaction) {
+ ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+ writeTopicSample(buffer, metricName, value, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel);
+ }
}
- private static void metricWithRemoteCluster(SimpleTextOutputStream stream,
String cluster, String namespace,
- String topic, String name, String remoteCluster, double value,
boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",remote_cluster=\"").write(remoteCluster).write("\"}
");
- stream.write(value);
- appendEndings(stream);
+ static void writeMetricWithBrokerDefault(Map<String, ByteBuf> allMetrics,
String metricName, Number value,
+ String cluster, String namespace,
String topic,
+ boolean
splitTopicAndPartitionIndexLabel) {
+ ByteBuf buffer = writeGaugeTypeWithBrokerDefault(allMetrics,
metricName, cluster);
+ writeTopicSample(buffer, metricName, value, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
}
- private static SimpleTextOutputStream
appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
- String namespace, String topic, String name, boolean
splitTopicAndPartitionIndexLabel) {
-
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+ static void writeTopicSample(ByteBuf buffer, String metricName, Number
value, String cluster,
+ String namespace, String topic, boolean
splitTopicAndPartitionIndexLabel,
+ String... extraLabelsAndValues) {
Review Comment:
@codelipenghui is ok with this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]