This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5fad80cd5e3bb19850738e05fe764f18516def80 Author: lipenghui <[email protected]> AuthorDate: Thu Sep 30 11:56:09 2021 +0800 [Metrics] Add support for splitting topic and partition label in Prometheus (#12225) * [Metrics] Add support for splitting topic and partition label in Prometheus Fix: #11432 Currently, we are only expose the partition name for the topic label in Prometheus metrics, which is difficult to have an aggregated metrics for a partitioned topic. Before this change, we can only get (topic=xxx-partition-0) in the metrics. After this change, we can get 2 labels (topic=xxx, partition=0). By default, the broker expose the single tag for topic. It need to change `splitTopicAndPartitionLabelInPrometheus=true` in the broker.conf New tests added. * Fix checkstyle. (cherry picked from commit 039079e850e7756c18929119ec215fff8ada643d) --- conf/broker.conf | 9 + conf/standalone.conf | 9 + .../apache/pulsar/broker/ServiceConfiguration.java | 11 + .../org/apache/pulsar/broker/PulsarService.java | 3 +- .../stats/prometheus/NamespaceStatsAggregator.java | 5 +- .../prometheus/PrometheusMetricsGenerator.java | 14 +- .../stats/prometheus/PrometheusMetricsServlet.java | 7 +- .../pulsar/broker/stats/prometheus/TopicStats.java | 352 ++++++++++++--------- .../pulsar/broker/stats/PrometheusMetricsTest.java | 42 +++ 9 files changed, 303 insertions(+), 149 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 775f9aa..d81e86c 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1172,6 +1172,15 @@ statsUpdateInitialDelayInSecs=60 # Default is false. exposePreciseBacklogInPrometheus=false +# Enable splitting topic and partition label in Prometheus. +# If enabled, a topic name will split into 2 parts, one is topic name without partition index, +# another one is partition index, e.g. (topic=xxx, partition=0). +# If the topic is a non-partitioned topic, -1 will be used for the partition index. +# If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0) +# Default is false. + +splitTopicAndPartitionLabelInPrometheus=false + ### --- Schema storage --- ### # The schema storage implementation used by this broker schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory diff --git a/conf/standalone.conf b/conf/standalone.conf index 363ce3b..e13723f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -855,6 +855,15 @@ exposePublisherStats=true # Default is false. exposePreciseBacklogInPrometheus=false +# Enable splitting topic and partition label in Prometheus. +# If enabled, a topic name will split into 2 parts, one is topic name without partition index, +# another one is partition index, e.g. (topic=xxx, partition=0). +# If the topic is a non-partitioned topic, -1 will be used for the partition index. +# If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0) +# Default is false. + +splitTopicAndPartitionLabelInPrometheus=false + ### --- Deprecated config variables --- ### # Deprecated. Use configurationStoreServers diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a223b58..c9b0c48 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1995,6 +1995,17 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean exposeSubscriptionBacklogSizeInPrometheus = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Enable splitting topic and partition label in Prometheus.\n" + + " If enabled, a topic name will split into 2 parts, one is topic name without partition index,\n" + + " another one is partition index, e.g. (topic=xxx, partition=0).\n" + + " If the topic is a non-partitioned topic, -1 will be used for the partition index.\n" + + " If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)\n" + + " Default is false." + ) + private boolean splitTopicAndPartitionLabelInPrometheus = false; + /**** --- Functions --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index f3eae1a..23f50b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -680,7 +680,8 @@ public class PulsarService implements AutoCloseable { this.metricsServlet = new PrometheusMetricsServlet( this, config.isExposeTopicLevelMetricsInPrometheus(), config.isExposeConsumerLevelMetricsInPrometheus(), - config.isExposeProducerLevelMetricsInPrometheus()); + config.isExposeProducerLevelMetricsInPrometheus(), + config.isSplitTopicAndPartitionLabelInPrometheus()); if (pendingMetricsProviders != null) { pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider)); this.pendingMetricsProviders = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index a2661ed..9df9ad4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -57,7 +57,7 @@ public class NamespaceStatsAggregator { }; public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, SimpleTextOutputStream stream) { + boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) { String cluster = pulsar.getConfiguration().getClusterName(); AggregatedNamespaceStats namespaceStats = localNamespaceStats.get(); TopicStats.resetTypes(); @@ -81,7 +81,8 @@ public class NamespaceStatsAggregator { if (includeTopicMetrics) { topicsCount.add(1); - TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean); + TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean, + splitTopicAndPartitionIndexLabel); } else { namespaceStats.updateStats(topicStats); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 8f303a3..9d5e1c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -87,11 +87,19 @@ public class PrometheusMetricsGenerator { public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, boolean includeProducerMetrics, OutputStream out) throws IOException { - generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, out, null); + generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null); } public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean includeProducerMetrics, OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders) + boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, + OutputStream out) throws IOException { + generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, + splitTopicAndPartitionIndexLabel, out, null); + } + + public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, + boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out, + List<PrometheusRawMetricsProvider> metricsProviders) throws IOException { ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); try { @@ -100,7 +108,7 @@ public class PrometheusMetricsGenerator { generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName()); NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics, - includeProducerMetrics, stream); + includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream); if (pulsar.getWorkerServiceOpt().isPresent()) { pulsar.getWorkerService().generateFunctionsStats(stream); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 026c26f..145f7a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -44,17 +44,19 @@ public class PrometheusMetricsServlet extends HttpServlet { private final boolean shouldExportConsumerMetrics; private final boolean shouldExportProducerMetrics; private final long metricsServletTimeoutMs; + private final boolean splitTopicAndPartitionLabel; private List<PrometheusRawMetricsProvider> metricsProviders; private ExecutorService executor = null; public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean shouldExportProducerMetrics) { + boolean shouldExportProducerMetrics, boolean splitTopicAndPartitionLabel) { this.pulsar = pulsar; this.shouldExportTopicMetrics = includeTopicMetrics; this.shouldExportConsumerMetrics = includeConsumerMetrics; this.shouldExportProducerMetrics = shouldExportProducerMetrics; this.metricsServletTimeoutMs = pulsar.getConfiguration().getMetricsServletTimeoutMs(); + this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel; } @Override @@ -73,7 +75,8 @@ public class PrometheusMetricsServlet extends HttpServlet { res.setStatus(HttpStatus.OK_200); res.setContentType("text/plain"); PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, - shouldExportProducerMetrics, res.getOutputStream(), metricsProviders); + shouldExportProducerMetrics, splitTopicAndPartitionLabel, res.getOutputStream(), + metricsProviders); context.complete(); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 61e0175..bfa427e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus; +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -104,222 +105,263 @@ class TopicStats { } static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - TopicStats stats, Optional<CompactorMXBean> compactorMXBean) { - metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount); - metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount); - metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount); - - metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn); - metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut); - metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn); - metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut); - metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize); - - metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize); + TopicStats stats, Optional<CompactorMXBean> compactorMXBean, + boolean splitTopicAndPartitionIndexLabel) { + metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount, + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount, + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount, + splitTopicAndPartitionIndexLabel); + + metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn, + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut, + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn, + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut, + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize, + splitTopicAndPartitionIndexLabel); + + metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size", - stats.managedLedgerStats.storageLogicalSize); - metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog); + stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size", - stats.managedLedgerStats.backlogSize); + stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats - .offloadedStorageUsed); - metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit); + .offloadedStorageUsed, splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time", - stats.backlogQuotaLimitTime); + stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel); long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]); - metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9]); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9], + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count", - stats.managedLedgerStats.storageWriteLatencyBuckets.getCount()); + stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum", - stats.managedLedgerStats.storageWriteLatencyBuckets.getSum()); + stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel); long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets(); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5", - ledgerWriteLatencyBuckets[0]); + ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1", - ledgerWriteLatencyBuckets[1]); + ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5", - ledgerWriteLatencyBuckets[2]); + ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10", - ledgerWriteLatencyBuckets[3]); + ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20", - ledgerWriteLatencyBuckets[4]); + ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50", - ledgerWriteLatencyBuckets[5]); + ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100", - ledgerWriteLatencyBuckets[6]); + ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200", - ledgerWriteLatencyBuckets[7]); + ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000", - ledgerWriteLatencyBuckets[8]); + ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow", - ledgerWriteLatencyBuckets[9]); + ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count", - stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount()); + stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum", - stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum()); + stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), + splitTopicAndPartitionIndexLabel); long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets(); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0]); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1]); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]); - metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_entry_size_count", - stats.managedLedgerStats.entrySizeBuckets.getCount()); + stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", - stats.managedLedgerStats.entrySizeBuckets.getSum()); + stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel); stats.producerStats.forEach((p, producerStats) -> { metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in", - producerStats.msgRateIn); + producerStats.msgRateIn, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in", - producerStats.msgThroughputIn); + producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size", - producerStats.averageMsgSize); + producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel); }); stats.subscriptionStats.forEach((n, subsStats) -> { metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", - subsStats.msgBacklog); + subsStats.msgBacklog, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", - subsStats.msgBacklogNoDelayed); + subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", - subsStats.msgDelayed); + subsStats.msgDelayed, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", - subsStats.msgRateRedeliver); + subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", - subsStats.unackedMessages); + subsStats.unackedMessages, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", - subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0); + subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", - subsStats.msgRateOut); + subsStats.msgRateOut, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", - subsStats.msgThroughputOut); + subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", - subsStats.bytesOutCounter); + subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", - subsStats.msgOutCounter); + subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp", - subsStats.lastExpireTimestamp); + subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp", - subsStats.lastAckedTimestamp); + subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp", - subsStats.lastConsumedFlowTimestamp); + subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp", - subsStats.lastConsumedTimestamp); + subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp", - subsStats.lastMarkDeleteAdvancedTimestamp); + subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired", - subsStats.msgRateExpired); + subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired", - subsStats.totalMsgExpired); + subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel); subsStats.consumerStat.forEach((c, consumerStats) -> { metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver); + "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_unacked_messages", consumerStats.unackedMessages); + "pulsar_consumer_unacked_messages", consumerStats.unackedMessages, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_blocked_on_unacked_messages", - consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0); + consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut); + "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut); + "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_consumer_available_permits", consumerStats.availablePermits); + "pulsar_consumer_available_permits", consumerStats.availablePermits, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_out_bytes_total", consumerStats.bytesOutCounter); + "pulsar_out_bytes_total", consumerStats.bytesOutCounter, + splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), - "pulsar_out_messages_total", consumerStats.msgOutCounter); + "pulsar_out_messages_total", consumerStats.msgOutCounter, + splitTopicAndPartitionIndexLabel); }); }); if (!stats.replicationStats.isEmpty()) { stats.replicationStats.forEach((remoteCluster, replStats) -> { metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster, - replStats.msgRateIn); + replStats.msgRateIn, splitTopicAndPartitionIndexLabel); metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster, - replStats.msgRateOut); + replStats.msgRateOut, splitTopicAndPartitionIndexLabel); metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in", - remoteCluster, - replStats.msgThroughputIn); + remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel); metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out", - remoteCluster, - replStats.msgThroughputOut); + remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel); metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster, - replStats.replicationBacklog); + replStats.replicationBacklog, splitTopicAndPartitionIndexLabel); metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count", - remoteCluster, replStats.connectedCount); + remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel); metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired", - remoteCluster, replStats.msgRateExpired); + remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel); metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds", - remoteCluster, replStats.replicationDelayInSeconds); + remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel); }); } - metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter); - metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter); + metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter, + splitTopicAndPartitionIndexLabel); + metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter, + splitTopicAndPartitionIndexLabel); // Compaction boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic)) .map(__ -> true).orElse(false); if (hasCompaction) { metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count", - stats.compactionRemovedEventCount); + stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count", - stats.compactionSucceedCount); + stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count", - stats.compactionFailedCount); + stats.compactionFailedCount, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills", - stats.compactionDurationTimeInMills); + stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput", - stats.compactionReadThroughput); + stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput", - stats.compactionWriteThroughput); + stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count", - stats.compactionCompactedEntriesCount); + stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size", - stats.compactionCompactedEntriesSize); + stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel); long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets(); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5", - compactionLatencyBuckets[0]); + compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1", - compactionLatencyBuckets[1]); + compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5", - compactionLatencyBuckets[2]); + compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10", - compactionLatencyBuckets[3]); + compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20", - compactionLatencyBuckets[4]); + compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50", - compactionLatencyBuckets[5]); + compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100", - compactionLatencyBuckets[6]); + compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200", - compactionLatencyBuckets[7]); + compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000", - compactionLatencyBuckets[8]); + compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow", - compactionLatencyBuckets[9]); + compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum", - stats.compactionLatencyBuckets.getSum()); + stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel); metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count", - stats.compactionLatencyBuckets.getCount()); + stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel); } } @@ -333,64 +375,92 @@ class TopicStats { } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String name, double value) { + String name, double value, boolean splitTopicAndPartitionIndexLabel) { metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} "); + stream.write(value); + appendEndings(stream); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String subscription, String name, long value) { + String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) { metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) + .write("\",subscription=\"").write(subscription).write("\"} "); + stream.write(value); + appendEndings(stream); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String producerName, long produceId, String name, double value) { + String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) { metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\",producer_name=\"").write(producerName) + appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) + .write("\",producer_name=\"").write(producerName) .write("\",producer_id=\"").write(produceId).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + stream.write(value); + appendEndings(stream); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String subscription, String name, double value) { + String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) { metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) + .write("\",subscription=\"").write(subscription).write("\"} "); + stream.write(value); + appendEndings(stream); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String subscription, String consumerName, long consumerId, String name, long value) { + String subscription, String consumerName, long consumerId, String name, long value, + boolean splitTopicAndPartitionIndexLabel) { metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription) + appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) + .write("\",subscription=\"").write(subscription) .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId) .write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + stream.write(value); + appendEndings(stream); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, - String subscription, String consumerName, long consumerId, String name, double value) { + String subscription, String consumerName, long consumerId, String name, double value, + boolean splitTopicAndPartitionIndexLabel) { metricType(stream, name); - stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) - .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription) + appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) + .write("\",subscription=\"").write(subscription) .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"") .write(consumerId).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + stream.write(value); + appendEndings(stream); } private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace, - String topic, - String name, String remoteCluster, double value) { + String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) { metricType(stream, name); + appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel) + .write("\",remote_cluster=\"").write(remoteCluster).write("\"} "); + stream.write(value); + appendEndings(stream); + } + + private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster, + String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) { stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace); - stream.write("\",topic=\"").write(topic).write("\",remote_cluster=\"").write(remoteCluster).write("\"} "); - stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); + if (splitTopicAndPartitionIndexLabel) { + int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX); + if (index > 0) { + stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"") + .write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length())); + } else { + stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1"); + } + } else { + stream.write("\",topic=\"").write(topic); + } + return stream; + } + + private static void appendEndings(SimpleTextOutputStream stream) { + stream.write(' ').write(System.currentTimeMillis()).write('\n'); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 3de3a04..9f09893 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1222,6 +1222,48 @@ public class PrometheusMetricsTest extends BrokerTestBase { pulsarClient.close(); } + @Test + public void testSplitTopicAndPartitionLabel() throws Exception { + String ns1 = "prop/ns-abc1"; + String ns2 = "prop/ns-abc2"; + admin.namespaces().createNamespace(ns1); + admin.namespaces().createNamespace(ns2); + String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount"; + String baseTopic2 = "persistent://" + ns2 + "/testMetricsTopicCount"; + for (int i = 0; i < 6; i++) { + admin.topics().createNonPartitionedTopic(baseTopic1 + UUID.randomUUID()); + } + for (int i = 0; i < 3; i++) { + admin.topics().createPartitionedTopic(baseTopic2 + UUID.randomUUID(), 3); + } + Consumer<byte[]> consumer1 = pulsarClient.newConsumer() + .topicsPattern("persistent://" + ns1 + "/.*") + .subscriptionName("sub") + .subscribe(); + Consumer<byte[]> consumer2 = pulsarClient.newConsumer() + .topicsPattern("persistent://" + ns2 + "/.*") + .subscriptionName("sub") + .subscribe(); + @Cleanup + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, true, statsOut); + String metricsStr = statsOut.toString(); + Multimap<String, Metric> metrics = parseMetrics(metricsStr); + Collection<Metric> metric = metrics.get("pulsar_consumers_count"); + assertTrue(metric.size() >= 15); + metric.forEach(item -> { + if (ns1.equals(item.tags.get("namespace"))) { + assertEquals(item.tags.get("partition"), "-1"); + } + if (ns2.equals(item.tags.get("namespace"))) { + System.out.println(item); + assertTrue(Integer.parseInt(item.tags.get("partition")) >= 0); + } + }); + consumer1.close(); + consumer2.close(); + } + private void compareCompactionStateCount(List<Metric> cm, double count) { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test");
