marksilcox commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r913006917
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java:
##########
@@ -296,161 +306,264 @@ private static void getTopicStats(Topic topic,
TopicStats stats, boolean include
});
}
- private static void printDefaultBrokerStats(SimpleTextOutputStream stream,
String cluster) {
- // Print metrics with 0 values. This is necessary to have the
available brokers being
- // reported in the brokers dashboard even if they don't have any topic
or traffic
- metric(stream, cluster, "pulsar_topics_count", 0);
- metric(stream, cluster, "pulsar_subscriptions_count", 0);
- metric(stream, cluster, "pulsar_producers_count", 0);
- metric(stream, cluster, "pulsar_consumers_count", 0);
- metric(stream, cluster, "pulsar_rate_in", 0);
- metric(stream, cluster, "pulsar_rate_out", 0);
- metric(stream, cluster, "pulsar_throughput_in", 0);
- metric(stream, cluster, "pulsar_throughput_out", 0);
- metric(stream, cluster, "pulsar_storage_size", 0);
- metric(stream, cluster, "pulsar_storage_logical_size", 0);
- metric(stream, cluster, "pulsar_storage_write_rate", 0);
- metric(stream, cluster, "pulsar_storage_read_rate", 0);
- metric(stream, cluster, "pulsar_msg_backlog", 0);
+ private static void printTopicsCountStats(Map<String, ByteBuf> allMetrics,
Map<String, Long> namespaceTopicsCount,
+ String cluster) {
+ namespaceTopicsCount.forEach((ns, topicCount) ->
+ writeMetricWithBrokerDefault(allMetrics,
"pulsar_topics_count", topicCount, cluster, ns)
+ );
}
- private static void printTopicsCountStats(SimpleTextOutputStream stream,
String cluster, String namespace,
- LongAdder topicsCount) {
- metric(stream, cluster, namespace, "pulsar_topics_count",
topicsCount.sum());
- }
+ private static void printNamespaceStats(Map<String, ByteBuf> allMetrics,
AggregatedNamespaceStats stats,
+ String cluster, String namespace) {
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_topics_count",
stats.topicsCount, cluster, namespace);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_subscriptions_count",
stats.subscriptionsCount, cluster,
+ namespace);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_producers_count",
stats.producersCount, cluster, namespace);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_consumers_count",
stats.consumersCount, cluster, namespace);
+
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_rate_in",
stats.rateIn, cluster, namespace);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_rate_out",
stats.rateOut, cluster, namespace);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_throughput_in",
stats.throughputIn, cluster, namespace);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_throughput_out",
stats.throughputOut, cluster, namespace);
+ writeMetric(allMetrics, "pulsar_consumer_msg_ack_rate",
stats.messageAckRate, cluster, namespace);
+
+ writeMetric(allMetrics, "pulsar_in_bytes_total", stats.bytesInCounter,
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_in_messages_total",
stats.msgInCounter, cluster, namespace);
+ writeMetric(allMetrics, "pulsar_out_bytes_total",
stats.bytesOutCounter, cluster, namespace);
+ writeMetric(allMetrics, "pulsar_out_messages_total",
stats.msgOutCounter, cluster, namespace);
+
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_size",
stats.managedLedgerStats.storageSize, cluster,
+ namespace);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_logical_size",
+ stats.managedLedgerStats.storageLogicalSize, cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize, cluster,
+ namespace);
+ writeMetric(allMetrics, "pulsar_storage_offloaded_size",
+ stats.managedLedgerStats.offloadedStorageUsed, cluster,
namespace);
+
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_write_rate",
stats.managedLedgerStats.storageWriteRate,
+ cluster, namespace);
+ writeMetricWithBrokerDefault(allMetrics, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate,
+ cluster, namespace);
+
+ writeMetric(allMetrics, "pulsar_subscription_delayed",
stats.msgDelayed, cluster, namespace);
+
+ writePulsarMsgBacklog(allMetrics, stats.msgBacklog, cluster,
namespace);
- private static void printNamespaceStats(SimpleTextOutputStream stream,
String cluster, String namespace,
- AggregatedNamespaceStats stats) {
- metric(stream, cluster, namespace, "pulsar_topics_count",
stats.topicsCount);
- metric(stream, cluster, namespace, "pulsar_subscriptions_count",
stats.subscriptionsCount);
- metric(stream, cluster, namespace, "pulsar_producers_count",
stats.producersCount);
- metric(stream, cluster, namespace, "pulsar_consumers_count",
stats.consumersCount);
+ stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_0_5",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[0], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_1",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[1], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_5",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[2], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_10",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[3], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_20",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[4], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_50",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[5], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_100",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[6], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_200",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[7], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_le_1000",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[8], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_overflow",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[9], cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_count",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_write_latency_sum",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(),
cluster, namespace);
- metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
- metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
- metric(stream, cluster, namespace, "pulsar_throughput_in",
stats.throughputIn);
- metric(stream, cluster, namespace, "pulsar_throughput_out",
stats.throughputOut);
- metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate",
stats.messageAckRate);
+ stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_0_5",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[0],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_1",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[1],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_5",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[2],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_10",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[3],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_20",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[4],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_50",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[5],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_100",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[6],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_200",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[7],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_le_1000",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[8],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_overflow",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[9],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_count",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_storage_ledger_write_latency_sum",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), cluster,
namespace);
- metric(stream, cluster, namespace, "pulsar_in_bytes_total",
stats.bytesInCounter);
- metric(stream, cluster, namespace, "pulsar_in_messages_total",
stats.msgInCounter);
- metric(stream, cluster, namespace, "pulsar_out_bytes_total",
stats.bytesOutCounter);
- metric(stream, cluster, namespace, "pulsar_out_messages_total",
stats.msgOutCounter);
+ stats.managedLedgerStats.entrySizeBuckets.refresh();
+ writeMetric(allMetrics, "pulsar_entry_size_le_128",
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[0],
+ cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_le_512",
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[1],
+ cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_le_1_kb",
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[2],
+ cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_le_2_kb",
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[3],
+ cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_le_4_kb",
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[4],
+ cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_le_16_kb",
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[5],
+ cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_le_100_kb",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[6],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_le_1_mb",
stats.managedLedgerStats.entrySizeBuckets.getBuckets()[7],
+ cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_le_overflow",
+ stats.managedLedgerStats.entrySizeBuckets.getBuckets()[8],
cluster, namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_count",
+ stats.managedLedgerStats.entrySizeBuckets.getCount(), cluster,
namespace);
+ writeMetric(allMetrics, "pulsar_entry_size_sum",
+ stats.managedLedgerStats.entrySizeBuckets.getSum(), cluster,
namespace);
+
+ writeReplicationStat(allMetrics, "pulsar_replication_rate_in", stats,
+ replStats -> replStats.msgRateIn, cluster, namespace);
+ writeReplicationStat(allMetrics, "pulsar_replication_rate_out", stats,
+ replStats -> replStats.msgRateOut, cluster, namespace);
+ writeReplicationStat(allMetrics, "pulsar_replication_throughput_in",
stats,
+ replStats -> replStats.msgThroughputIn, cluster, namespace);
+ writeReplicationStat(allMetrics, "pulsar_replication_throughput_out",
stats,
+ replStats -> replStats.msgThroughputOut, cluster, namespace);
+ writeReplicationStat(allMetrics, "pulsar_replication_backlog", stats,
+ replStats -> replStats.replicationBacklog, cluster, namespace);
+ writeReplicationStat(allMetrics, "pulsar_replication_connected_count",
stats,
+ replStats -> replStats.connectedCount, cluster, namespace);
+ writeReplicationStat(allMetrics, "pulsar_replication_rate_expired",
stats,
+ replStats -> replStats.msgRateExpired, cluster, namespace);
+ writeReplicationStat(allMetrics,
"pulsar_replication_delay_in_seconds", stats,
+ replStats -> replStats.replicationDelayInSeconds, cluster,
namespace);
+ }
- metric(stream, cluster, namespace, "pulsar_storage_size",
stats.managedLedgerStats.storageSize);
- metric(stream, cluster, namespace, "pulsar_storage_logical_size",
stats.managedLedgerStats.storageLogicalSize);
- metric(stream, cluster, namespace, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize);
- metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
- stats.managedLedgerStats.offloadedStorageUsed);
+ private static void writeMetricWithBrokerDefault(Map<String, ByteBuf>
allMetrics, String metricName, Number value,
+ String cluster, String
namespace) {
+ ByteBuf buffer = writeGaugeTypeWithBrokerDefault(allMetrics,
metricName, cluster);
+ writeSample(buffer, metricName, value, "cluster", cluster,
"namespace", namespace);
+ }
- metric(stream, cluster, namespace, "pulsar_storage_write_rate",
stats.managedLedgerStats.storageWriteRate);
- metric(stream, cluster, namespace, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate);
+ private static void writePulsarMsgBacklog(Map<String, ByteBuf> allMetrics,
Number value,
+ String cluster, String
namespace) {
+ ByteBuf buffer = writeGaugeTypeWithBrokerDefault(allMetrics,
"pulsar_msg_backlog", cluster);
+ writeSample(buffer, "pulsar_msg_backlog", value, "cluster", cluster,
"namespace", namespace, "remote_cluster",
+ "local");
+ }
- metric(stream, cluster, namespace, "pulsar_subscription_delayed",
stats.msgDelayed);
+ private static void writeMetric(Map<String, ByteBuf> allMetrics, String
metricName, Number value, String cluster,
+ String namespace) {
+ ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+ writeSample(buffer, metricName, value, "cluster", cluster,
"namespace", namespace);
+ }
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_msg_backlog", "local", stats.msgBacklog);
+ private static void writeReplicationStat(Map<String, ByteBuf> allMetrics,
String metricName,
+ AggregatedNamespaceStats
namespaceStats,
+
Function<AggregatedReplicationStats, Number> replStatsFunction,
+ String cluster, String namespace)
{
+ if (!namespaceStats.replicationStats.isEmpty()) {
+ ByteBuf buffer = writeGaugeType(allMetrics, metricName);
+ namespaceStats.replicationStats.forEach((remoteCluster, replStats)
->
+ writeSample(buffer, metricName,
replStatsFunction.apply(replStats),
+ "cluster", cluster,
+ "namespace", namespace,
+ "remote_cluster", remoteCluster)
+ );
+ }
+ }
- stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
- long[] latencyBuckets =
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_1", latencyBuckets[1]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_5", latencyBuckets[2]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_10", latencyBuckets[3]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_20", latencyBuckets[4]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_50", latencyBuckets[5]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_100", latencyBuckets[6]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_200", latencyBuckets[7]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_overflow", latencyBuckets[9]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_count",
-
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+ static ByteBuf writeGaugeType(Map<String, ByteBuf> allMetrics, String
metricName) {
+ if (!allMetrics.containsKey(metricName)) {
+ ByteBuf buffer =
UnpooledByteBufAllocator.DEFAULT.compositeDirectBuffer(MAX_COMPONENTS);
Review Comment:
copied from `PrometheusMetricsGenerator` to get me started and forgot to
replace - replaced with `ByteBufAllocator.DEFAULT.heapBuffer()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]