This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new bc69cfbe9d3 [fix][broker][functions-worker] Ensure prometheus metrics
are grouped by type (#8407, #13865) (#17618)
bc69cfbe9d3 is described below
commit bc69cfbe9d3cf06fc6e407c284eed45f095b41be
Author: Mark Silcox <[email protected]>
AuthorDate: Tue Sep 20 09:38:06 2022 +0100
[fix][broker][functions-worker] Ensure prometheus metrics are grouped by
type (#8407, #13865) (#17618)
---
.../stats/prometheus/AggregatedNamespaceStats.java | 2 +-
.../stats/prometheus/NamespaceStatsAggregator.java | 354 ++++++------
.../stats/prometheus/PrometheusMetricStreams.java | 75 +++
.../prometheus/PrometheusMetricsGenerator.java | 60 ++-
.../pulsar/broker/stats/prometheus/TopicStats.java | 598 ++++++++++-----------
.../stats/prometheus/TransactionAggregator.java | 321 +++++------
.../metrics/PrometheusTextFormatUtil.java | 32 --
.../pulsar/broker/stats/PrometheusMetricsTest.java | 58 ++
.../prometheus/PrometheusMetricStreamsTest.java | 85 +++
.../pulsar/common/util/SimpleTextOutputStream.java | 13 +-
.../instance/stats/PrometheusTextFormat.java | 5 +
.../functions/worker/WorkerStatsManager.java | 5 +
12 files changed, 878 insertions(+), 730 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 5610dbab218..1980af91b7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -96,7 +96,7 @@ public class AggregatedNamespaceStats {
stats.replicationStats.forEach((n, as) -> {
AggregatedReplicationStats replStats =
- replicationStats.computeIfAbsent(n, k -> new
AggregatedReplicationStats());
+ replicationStats.computeIfAbsent(n, k -> new
AggregatedReplicationStats());
replStats.msgRateIn += as.msgRateIn;
replStats.msgRateOut += as.msgRateOut;
replStats.msgThroughputIn += as.msgThroughputIn;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 16e438e2a2e..29915f071c0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -19,8 +19,11 @@
package org.apache.pulsar.broker.stats.prometheus;
import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -32,7 +35,6 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
@@ -40,72 +42,75 @@ import org.apache.pulsar.compaction.CompactorMXBean;
@Slf4j
public class NamespaceStatsAggregator {
- private static FastThreadLocal<AggregatedNamespaceStats>
localNamespaceStats =
+ private static final FastThreadLocal<AggregatedNamespaceStats>
localNamespaceStats =
new FastThreadLocal<AggregatedNamespaceStats>() {
@Override
- protected AggregatedNamespaceStats initialValue() throws
Exception {
+ protected AggregatedNamespaceStats initialValue() {
return new AggregatedNamespaceStats();
}
};
- private static FastThreadLocal<TopicStats> localTopicStats = new
FastThreadLocal<TopicStats>() {
+ private static final FastThreadLocal<TopicStats> localTopicStats = new
FastThreadLocal<TopicStats>() {
@Override
- protected TopicStats initialValue() throws Exception {
+ protected TopicStats initialValue() {
return new TopicStats();
}
};
public static void generate(PulsarService pulsar, boolean
includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean
splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
+ boolean includeProducerMetrics, boolean
splitTopicAndPartitionIndexLabel,
+ PrometheusMetricStreams stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
- TopicStats.resetTypes();
TopicStats topicStats = localTopicStats.get();
+ Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
+ LongAdder topicsCount = new LongAdder();
+ Map<String, Long> localNamespaceTopicCount = new HashMap<>();
printDefaultBrokerStats(stream, cluster);
- Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
- LongAdder topicsCount = new LongAdder();
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace,
bundlesMap) -> {
namespaceStats.reset();
topicsCount.reset();
- bundlesMap.forEach((bundle, topicsMap) -> {
- topicsMap.forEach((name, topic) -> {
- getTopicStats(topic, topicStats, includeConsumerMetrics,
includeProducerMetrics,
-
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
-
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
- compactorMXBean
- );
-
- if (includeTopicMetrics) {
- topicsCount.add(1);
- TopicStats.printTopicStats(stream, cluster, namespace,
name, topicStats, compactorMXBean,
- splitTopicAndPartitionIndexLabel);
- } else {
- namespaceStats.updateStats(topicStats);
- }
- });
- });
+ bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name,
topic) -> {
+ getTopicStats(topic, topicStats, includeConsumerMetrics,
includeProducerMetrics,
+
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
+
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
+ compactorMXBean
+ );
+
+ if (includeTopicMetrics) {
+ topicsCount.add(1);
+ TopicStats.printTopicStats(stream, topicStats,
compactorMXBean, cluster, namespace, name,
+ splitTopicAndPartitionIndexLabel);
+ } else {
+ namespaceStats.updateStats(topicStats);
+ }
+ }));
if (!includeTopicMetrics) {
- // Only include namespace level stats if we don't have the
per-topic, otherwise we're going to report
- // the same data twice, and it will make the aggregation
difficult
- printNamespaceStats(stream, cluster, namespace,
namespaceStats);
+ // Only include namespace level stats if we don't have the
per-topic, otherwise we're going to
+ // report the same data twice, and it will make the
aggregation difficult
+ printNamespaceStats(stream, namespaceStats, cluster,
namespace);
} else {
- printTopicsCountStats(stream, cluster, namespace, topicsCount);
+ localNamespaceTopicCount.put(namespace, topicsCount.sum());
}
});
+
+ if (includeTopicMetrics) {
+ printTopicsCountStats(stream, localNamespaceTopicCount, cluster);
+ }
}
private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService
pulsar) {
Compactor compactor = pulsar.getNullableCompactor();
- return Optional.ofNullable(compactor).map(c -> c.getStats());
+ return Optional.ofNullable(compactor).map(Compactor::getStats);
}
private static void getTopicStats(Topic topic, TopicStats stats, boolean
includeConsumerMetrics,
- boolean includeProducerMetrics, boolean getPreciseBacklog, boolean
subscriptionBacklogSize,
- Optional<CompactorMXBean>
compactorMXBean) {
+ boolean includeProducerMetrics, boolean
getPreciseBacklog,
+ boolean subscriptionBacklogSize,
Optional<CompactorMXBean> compactorMXBean) {
stats.reset();
if (topic instanceof PersistentTopic) {
@@ -267,161 +272,176 @@ public class NamespaceStatsAggregator {
});
}
- private static void printDefaultBrokerStats(SimpleTextOutputStream stream,
String cluster) {
+ private static void printDefaultBrokerStats(PrometheusMetricStreams
stream, String cluster) {
// Print metrics with 0 values. This is necessary to have the
available brokers being
// reported in the brokers dashboard even if they don't have any topic
or traffic
- metric(stream, cluster, "pulsar_topics_count", 0);
- metric(stream, cluster, "pulsar_subscriptions_count", 0);
- metric(stream, cluster, "pulsar_producers_count", 0);
- metric(stream, cluster, "pulsar_consumers_count", 0);
- metric(stream, cluster, "pulsar_rate_in", 0);
- metric(stream, cluster, "pulsar_rate_out", 0);
- metric(stream, cluster, "pulsar_throughput_in", 0);
- metric(stream, cluster, "pulsar_throughput_out", 0);
- metric(stream, cluster, "pulsar_storage_size", 0);
- metric(stream, cluster, "pulsar_storage_logical_size", 0);
- metric(stream, cluster, "pulsar_storage_write_rate", 0);
- metric(stream, cluster, "pulsar_storage_read_rate", 0);
- metric(stream, cluster, "pulsar_msg_backlog", 0);
+ writeMetric(stream, "pulsar_topics_count", 0, cluster);
+ writeMetric(stream, "pulsar_subscriptions_count", 0, cluster);
+ writeMetric(stream, "pulsar_producers_count", 0, cluster);
+ writeMetric(stream, "pulsar_consumers_count", 0, cluster);
+ writeMetric(stream, "pulsar_rate_in", 0, cluster);
+ writeMetric(stream, "pulsar_rate_out", 0, cluster);
+ writeMetric(stream, "pulsar_throughput_in", 0, cluster);
+ writeMetric(stream, "pulsar_throughput_out", 0, cluster);
+ writeMetric(stream, "pulsar_storage_size", 0, cluster);
+ writeMetric(stream, "pulsar_storage_logical_size", 0, cluster);
+ writeMetric(stream, "pulsar_storage_write_rate", 0, cluster);
+ writeMetric(stream, "pulsar_storage_read_rate", 0, cluster);
+ writeMetric(stream, "pulsar_msg_backlog", 0, cluster);
}
- private static void printTopicsCountStats(SimpleTextOutputStream stream,
String cluster, String namespace,
- LongAdder topicsCount) {
- metric(stream, cluster, namespace, "pulsar_topics_count",
topicsCount.sum());
+ private static void printTopicsCountStats(PrometheusMetricStreams stream,
Map<String, Long> namespaceTopicsCount,
+ String cluster) {
+ namespaceTopicsCount.forEach(
+ (ns, topicCount) -> writeMetric(stream, "pulsar_topics_count",
topicCount, cluster, ns)
+ );
}
- private static void printNamespaceStats(SimpleTextOutputStream stream,
String cluster, String namespace,
- AggregatedNamespaceStats stats) {
- metric(stream, cluster, namespace, "pulsar_topics_count",
stats.topicsCount);
- metric(stream, cluster, namespace, "pulsar_subscriptions_count",
stats.subscriptionsCount);
- metric(stream, cluster, namespace, "pulsar_producers_count",
stats.producersCount);
- metric(stream, cluster, namespace, "pulsar_consumers_count",
stats.consumersCount);
-
- metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
- metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
- metric(stream, cluster, namespace, "pulsar_throughput_in",
stats.throughputIn);
- metric(stream, cluster, namespace, "pulsar_throughput_out",
stats.throughputOut);
- metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate",
stats.messageAckRate);
-
- metric(stream, cluster, namespace, "pulsar_in_bytes_total",
stats.bytesInCounter);
- metric(stream, cluster, namespace, "pulsar_in_messages_total",
stats.msgInCounter);
- metric(stream, cluster, namespace, "pulsar_out_bytes_total",
stats.bytesOutCounter);
- metric(stream, cluster, namespace, "pulsar_out_messages_total",
stats.msgOutCounter);
-
- metric(stream, cluster, namespace, "pulsar_storage_size",
stats.managedLedgerStats.storageSize);
- metric(stream, cluster, namespace, "pulsar_storage_logical_size",
stats.managedLedgerStats.storageLogicalSize);
- metric(stream, cluster, namespace, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize);
- metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
- stats.managedLedgerStats.offloadedStorageUsed);
-
- metric(stream, cluster, namespace, "pulsar_storage_write_rate",
stats.managedLedgerStats.storageWriteRate);
- metric(stream, cluster, namespace, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate);
-
- metric(stream, cluster, namespace, "pulsar_subscription_delayed",
stats.msgDelayed);
-
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_msg_backlog", "local", stats.msgBacklog);
+ private static void printNamespaceStats(PrometheusMetricStreams stream,
AggregatedNamespaceStats stats,
+ String cluster, String namespace) {
+ writeMetric(stream, "pulsar_topics_count", stats.topicsCount, cluster,
namespace);
+ writeMetric(stream, "pulsar_subscriptions_count",
stats.subscriptionsCount, cluster,
+ namespace);
+ writeMetric(stream, "pulsar_producers_count", stats.producersCount,
cluster, namespace);
+ writeMetric(stream, "pulsar_consumers_count", stats.consumersCount,
cluster, namespace);
+
+ writeMetric(stream, "pulsar_rate_in", stats.rateIn, cluster,
namespace);
+ writeMetric(stream, "pulsar_rate_out", stats.rateOut, cluster,
namespace);
+ writeMetric(stream, "pulsar_throughput_in", stats.throughputIn,
cluster, namespace);
+ writeMetric(stream, "pulsar_throughput_out", stats.throughputOut,
cluster, namespace);
+ writeMetric(stream, "pulsar_consumer_msg_ack_rate",
stats.messageAckRate, cluster, namespace);
+
+ writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter,
cluster, namespace);
+ writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter,
cluster, namespace);
+ writeMetric(stream, "pulsar_out_bytes_total", stats.bytesOutCounter,
cluster, namespace);
+ writeMetric(stream, "pulsar_out_messages_total", stats.msgOutCounter,
cluster, namespace);
+
+ writeMetric(stream, "pulsar_storage_size",
stats.managedLedgerStats.storageSize, cluster,
+ namespace);
+ writeMetric(stream, "pulsar_storage_logical_size",
+ stats.managedLedgerStats.storageLogicalSize, cluster,
namespace);
+ writeMetric(stream, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize, cluster,
+ namespace);
+ writeMetric(stream, "pulsar_storage_offloaded_size",
+ stats.managedLedgerStats.offloadedStorageUsed, cluster,
namespace);
+
+ writeMetric(stream, "pulsar_storage_write_rate",
stats.managedLedgerStats.storageWriteRate,
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate,
+ cluster, namespace);
+
+ writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed,
cluster, namespace);
+
+ writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
long[] latencyBuckets =
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_1", latencyBuckets[1]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_5", latencyBuckets[2]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_10", latencyBuckets[3]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_20", latencyBuckets[4]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_50", latencyBuckets[5]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_100", latencyBuckets[6]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_200", latencyBuckets[7]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_overflow", latencyBuckets[9]);
- metric(stream, cluster, namespace,
"pulsar_storage_write_latency_count",
-
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+ writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
latencyBuckets[0], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1",
latencyBuckets[1], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_5",
latencyBuckets[2], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_10",
latencyBuckets[3], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_20",
latencyBuckets[4], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_50",
latencyBuckets[5], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_100",
latencyBuckets[6], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_200",
latencyBuckets[7], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1000",
latencyBuckets[8], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_overflow",
latencyBuckets[9], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_count",
+
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), cluster,
namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_sum",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(),
cluster, namespace);
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
- long[] ledgerWritelatencyBuckets =
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_overflow",
- ledgerWritelatencyBuckets[9]);
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_count",
-
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace,
"pulsar_storage_ledger_write_latency_sum",
-
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+ long[] ledgerWriteLatencyBuckets =
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5",
ledgerWriteLatencyBuckets[0],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1",
ledgerWriteLatencyBuckets[1],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5",
ledgerWriteLatencyBuckets[2],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10",
ledgerWriteLatencyBuckets[3],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20",
ledgerWriteLatencyBuckets[4],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50",
ledgerWriteLatencyBuckets[5],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100",
ledgerWriteLatencyBuckets[6],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200",
ledgerWriteLatencyBuckets[7],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000",
ledgerWriteLatencyBuckets[8],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow",
ledgerWriteLatencyBuckets[9],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), cluster,
namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
+
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), cluster,
namespace);
stats.managedLedgerStats.entrySizeBuckets.refresh();
long[] entrySizeBuckets =
stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- metric(stream, cluster, namespace, "pulsar_entry_size_le_128",
entrySizeBuckets[0]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_512",
entrySizeBuckets[1]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb",
entrySizeBuckets[2]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb",
entrySizeBuckets[3]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb",
entrySizeBuckets[4]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb",
entrySizeBuckets[5]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb",
entrySizeBuckets[6]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb",
entrySizeBuckets[7]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow",
entrySizeBuckets[8]);
- metric(stream, cluster, namespace, "pulsar_entry_size_count",
- stats.managedLedgerStats.entrySizeBuckets.getCount());
- metric(stream, cluster, namespace, "pulsar_entry_size_sum",
- stats.managedLedgerStats.entrySizeBuckets.getSum());
-
- if (!stats.replicationStats.isEmpty()) {
- stats.replicationStats.forEach((remoteCluster, replStats) -> {
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_replication_rate_in", remoteCluster,
- replStats.msgRateIn);
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_replication_rate_out", remoteCluster,
- replStats.msgRateOut);
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_replication_throughput_in", remoteCluster,
- replStats.msgThroughputIn);
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_replication_throughput_out", remoteCluster,
- replStats.msgThroughputOut);
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_replication_backlog", remoteCluster,
- replStats.replicationBacklog);
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_replication_connected_count", remoteCluster,
- replStats.connectedCount);
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_replication_rate_expired", remoteCluster,
- replStats.msgRateExpired);
- metricWithRemoteCluster(stream, cluster, namespace,
"pulsar_replication_delay_in_seconds",
- remoteCluster, replStats.replicationDelayInSeconds);
- });
- }
+ writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0],
cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1],
cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_100_kb",
entrySizeBuckets[6], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_overflow",
entrySizeBuckets[8], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_count",
stats.managedLedgerStats.entrySizeBuckets.getCount(),
+ cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_sum",
stats.managedLedgerStats.entrySizeBuckets.getSum(),
+ cluster, namespace);
+
+ writeReplicationStat(stream, "pulsar_replication_rate_in", stats,
+ replStats -> replStats.msgRateIn, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_rate_out", stats,
+ replStats -> replStats.msgRateOut, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_throughput_in", stats,
+ replStats -> replStats.msgThroughputIn, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_throughput_out",
stats,
+ replStats -> replStats.msgThroughputOut, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_backlog", stats,
+ replStats -> replStats.replicationBacklog, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_connected_count",
stats,
+ replStats -> replStats.connectedCount, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
+ replStats -> replStats.msgRateExpired, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_delay_in_seconds",
stats,
+ replStats -> replStats.replicationDelayInSeconds, cluster,
namespace);
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String name,
- long value) {
- TopicStats.metricType(stream, name);
- stream.write(name)
- .write("{cluster=\"").write(cluster).write("\"} ")
- .write(value).write(' ').write(System.currentTimeMillis())
- .write('\n');
+ private static void writePulsarMsgBacklog(PrometheusMetricStreams stream,
Number value,
+ String cluster, String
namespace) {
+ stream.writeSample("pulsar_msg_backlog", value, "cluster", cluster,
"namespace", namespace,
+ "remote_cluster",
+ "local");
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String name,
- long value) {
- TopicStats.metricType(stream, name);
-
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"}
");
- stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ private static void writeMetric(PrometheusMetricStreams stream, String
metricName, Number value,
+ String cluster) {
+ stream.writeSample(metricName, value, "cluster", cluster);
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String name,
- double value) {
- TopicStats.metricType(stream, name);
-
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"}
");
- stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ private static void writeMetric(PrometheusMetricStreams stream, String
metricName, Number value, String cluster,
+ String namespace) {
+ stream.writeSample(metricName, value, "cluster", cluster, "namespace",
namespace);
}
- private static void metricWithRemoteCluster(SimpleTextOutputStream stream,
String cluster, String namespace,
- String name, String
remoteCluster, double value) {
- TopicStats.metricType(stream, name);
-
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
- stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"}
");
- stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ private static void writeReplicationStat(PrometheusMetricStreams stream,
String metricName,
+ AggregatedNamespaceStats
namespaceStats,
+
Function<AggregatedReplicationStats, Number> sampleValueFunction,
+ String cluster, String namespace)
{
+ if (!namespaceStats.replicationStats.isEmpty()) {
+ namespaceStats.replicationStats.forEach((remoteCluster, replStats)
->
+ stream.writeSample(metricName,
sampleValueFunction.apply(replStats),
+ "cluster", cluster,
+ "namespace", namespace,
+ "remote_cluster", remoteCluster)
+ );
+ }
}
+
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
new file mode 100644
index 00000000000..6b6b972c175
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Helper class to ensure that metrics of the same name are grouped together
under the same TYPE header when written.
+ * Those are the requirements of the
+ * <a
href="https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#grouping-and-sorting">Prometheus
Exposition Format</a>.
+ */
+public class PrometheusMetricStreams {
+ private final Map<String, SimpleTextOutputStream> metricStreamMap = new
HashMap<>();
+
+ /**
+ * Write the given metric and sample value to the stream. Will write #TYPE
header if metric not seen before.
+ * @param metricName name of the metric.
+ * @param value value of the sample
+ * @param labelsAndValuesArray varargs of label and label value
+ */
+ void writeSample(String metricName, Number value, String...
labelsAndValuesArray) {
+ SimpleTextOutputStream stream = initGaugeType(metricName);
+ stream.write(metricName).write('{');
+ for (int i = 0; i < labelsAndValuesArray.length; i += 2) {
+
stream.write(labelsAndValuesArray[i]).write("=\"").write(labelsAndValuesArray[i
+ 1]).write('\"');
+ if (i + 2 != labelsAndValuesArray.length) {
+ stream.write(',');
+ }
+ }
+ stream.write("} ").write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ }
+
+ /**
+ * Flush all the stored metrics to the supplied stream.
+ * @param stream the stream to write to.
+ */
+ void flushAllToStream(SimpleTextOutputStream stream) {
+ metricStreamMap.values().forEach(s -> stream.write(s.getBuffer()));
+ }
+
+ /**
+ * Release all the streams to clean up resources.
+ */
+ void releaseAll() {
+ metricStreamMap.values().forEach(s -> s.getBuffer().release());
+ metricStreamMap.clear();
+ }
+
+ private SimpleTextOutputStream initGaugeType(String metricName) {
+ return metricStreamMap.computeIfAbsent(metricName, s -> {
+ SimpleTextOutputStream stream = new
SimpleTextOutputStream(PulsarByteBufAllocator.DEFAULT.directBuffer());
+ stream.write("# TYPE ").write(metricName).write(" gauge\n");
+ return stream;
+ });
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index cd6afd1535d..a993d1edf3a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -52,7 +52,8 @@ import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
* Generate metrics aggregated at the namespace level and optionally at a
topic level and formats them out
* in a text format suitable to be consumed by Prometheus.
- * Format specification can be found at {@link
https://prometheus.io/docs/instrumenting/exposition_formats/}
+ * Format specification can be found at <a
+ *
href="https://prometheus.io/docs/instrumenting/exposition_formats/">Exposition
Formats</a>
*/
public class PrometheusMetricsGenerator {
@@ -86,38 +87,44 @@ public class PrometheusMetricsGenerator {
}
public static void generate(PulsarService pulsar, boolean
includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, OutputStream out) throws IOException {
+ boolean includeProducerMetrics, OutputStream
out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
includeProducerMetrics, false, out, null);
}
public static void generate(PulsarService pulsar, boolean
includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean
splitTopicAndPartitionIndexLabel,
- OutputStream out) throws IOException {
+ boolean includeProducerMetrics, boolean
splitTopicAndPartitionIndexLabel,
+ OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
includeProducerMetrics,
splitTopicAndPartitionIndexLabel, out, null);
}
public static void generate(PulsarService pulsar, boolean
includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean
splitTopicAndPartitionIndexLabel, OutputStream out,
- List<PrometheusRawMetricsProvider> metricsProviders)
- throws IOException {
+ boolean includeProducerMetrics, boolean
splitTopicAndPartitionIndexLabel,
+ OutputStream out,
+ List<PrometheusRawMetricsProvider>
metricsProviders)
+ throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+ boolean exceptionHappens = false;
+ //Used in namespace/topic and transaction aggregators as share metric
names
+ PrometheusMetricStreams metricStreams = new PrometheusMetricStreams();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
generateSystemMetrics(stream,
pulsar.getConfiguration().getClusterName());
NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics,
includeConsumerMetrics,
- includeProducerMetrics, splitTopicAndPartitionIndexLabel,
stream);
+ includeProducerMetrics, splitTopicAndPartitionIndexLabel,
metricStreams);
if (pulsar.getWorkerServiceOpt().isPresent()) {
pulsar.getWorkerService().generateFunctionsStats(stream);
}
if (pulsar.getConfiguration().isTransactionCoordinatorEnabled()) {
- TransactionAggregator.generate(pulsar, stream,
includeTopicMetrics);
+ TransactionAggregator.generate(pulsar, metricStreams,
includeTopicMetrics);
}
+ metricStreams.flushAllToStream(stream);
+
generateBrokerBasicMetrics(pulsar, stream);
generateManagedLedgerBookieClientMetrics(pulsar, stream);
@@ -129,7 +136,12 @@ public class PrometheusMetricsGenerator {
}
out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
- buf.release();
+ //release all the metrics buffers
+ metricStreams.releaseAll();
+ //if exception happens, release buffer
+ if (exceptionHappens) {
+ buf.release();
+ }
}
}
@@ -142,17 +154,17 @@ public class PrometheusMetricsGenerator {
if
(pulsar.getConfiguration().isExposeManagedLedgerMetricsInPrometheus()) {
// generate managedLedger metrics
parseMetricsToPrometheusMetrics(new
ManagedLedgerMetrics(pulsar).generate(),
- clusterName, Collector.Type.GAUGE, stream);
+ clusterName, Collector.Type.GAUGE, stream);
}
if
(pulsar.getConfiguration().isExposeManagedCursorMetricsInPrometheus()) {
// generate managedCursor metrics
parseMetricsToPrometheusMetrics(new
ManagedCursorMetrics(pulsar).generate(),
- clusterName, Collector.Type.GAUGE, stream);
+ clusterName, Collector.Type.GAUGE, stream);
}
parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService()
-
.getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
+
.getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
clusterName, Collector.Type.GAUGE, stream);
// generate loadBalance metrics
@@ -267,17 +279,17 @@ public class PrometheusMetricsGenerator {
static String getTypeStr(Collector.Type type) {
switch (type) {
- case COUNTER:
- return "counter";
- case GAUGE:
- return "gauge";
- case SUMMARY :
- return "summary";
- case HISTOGRAM:
- return "histogram";
- case UNTYPED:
- default:
- return "untyped";
+ case COUNTER:
+ return "counter";
+ case GAUGE:
+ return "gauge";
+ case SUMMARY:
+ return "summary";
+ case HISTOGRAM:
+ return "histogram";
+ case UNTYPED:
+ default:
+ return "untyped";
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index e6e5883847d..e91521aff55 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -23,12 +23,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.compaction.CompactionRecord;
import org.apache.pulsar.compaction.CompactorMXBean;
class TopicStats {
-
int subscriptionsCount;
int producersCount;
int consumersCount;
@@ -43,7 +43,6 @@ class TopicStats {
double averageMsgSize;
public long msgBacklog;
-
long publishRateLimitedTimes;
long backlogQuotaLimit;
@@ -55,9 +54,6 @@ class TopicStats {
Map<String, AggregatedSubscriptionStats> subscriptionStats = new
HashMap<>();
Map<String, AggregatedProducerStats> producerStats = new HashMap<>();
- // Used for tracking duplicate TYPE definitions
- static Map<String, String> metricWithTypeDefinition = new HashMap<>();
-
// For compaction
long compactionRemovedEventCount;
long compactionSucceedCount;
@@ -103,378 +99,340 @@ class TopicStats {
compactionLatencyBuckets.reset();
}
- static void resetTypes() {
- metricWithTypeDefinition.clear();
- }
-
- static void printTopicStats(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- TopicStats stats, Optional<CompactorMXBean>
compactorMXBean,
- boolean splitTopicAndPartitionIndexLabel) {
- metric(stream, cluster, namespace, topic,
"pulsar_subscriptions_count", stats.subscriptionsCount,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_producers_count",
stats.producersCount,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_consumers_count",
stats.consumersCount,
- splitTopicAndPartitionIndexLabel);
-
- metric(stream, cluster, namespace, topic, "pulsar_rate_in",
stats.rateIn,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_rate_out",
stats.rateOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_throughput_in",
stats.throughputIn,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_throughput_out",
stats.throughputOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_average_msg_size",
stats.averageMsgSize,
- splitTopicAndPartitionIndexLabel);
-
- metric(stream, cluster, namespace, topic, "pulsar_storage_size",
stats.managedLedgerStats.storageSize,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_logical_size",
- stats.managedLedgerStats.storageLogicalSize,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_msg_backlog",
stats.msgBacklog,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_rate",
- stats.managedLedgerStats.storageWriteRate,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_size",
- stats.managedLedgerStats.backlogSize,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_offloaded_size", stats.managedLedgerStats
- .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_backlog_quota_limit_time",
- stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
+ public static void printTopicStats(PrometheusMetricStreams stream,
TopicStats stats,
+ Optional<CompactorMXBean>
compactorMXBean, String cluster, String namespace,
+ String topic, boolean
splitTopicAndPartitionIndexLabel) {
+ writeMetric(stream, "pulsar_subscriptions_count",
stats.subscriptionsCount,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_producers_count", stats.producersCount,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_consumers_count", stats.consumersCount,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+
+ writeMetric(stream, "pulsar_rate_in", stats.rateIn,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_rate_out", stats.rateOut,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_throughput_in", stats.throughputIn,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_throughput_out", stats.throughputOut,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+
+ writeMetric(stream, "pulsar_storage_size",
stats.managedLedgerStats.storageSize,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_logical_size",
+ stats.managedLedgerStats.storageLogicalSize, cluster,
namespace, topic,
+ splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_msg_backlog", stats.msgBacklog,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_rate",
stats.managedLedgerStats.storageWriteRate,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_read_rate",
stats.managedLedgerStats.storageReadRate,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_publish_rate_limit_times",
stats.publishRateLimitedTimes,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_offloaded_size",
stats.managedLedgerStats
+ .offloadedStorageUsed, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_backlog_quota_limit",
stats.backlogQuotaLimit,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_backlog_quota_limit_time",
stats.backlogQuotaLimitTime,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
long[] latencyBuckets =
stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_1", latencyBuckets[1],
+ writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
+ latencyBuckets[0], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1",
+ latencyBuckets[1], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_5",
+ latencyBuckets[2], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_10",
+ latencyBuckets[3], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_20",
+ latencyBuckets[4], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_50",
+ latencyBuckets[5], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_100",
+ latencyBuckets[6], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_200",
+ latencyBuckets[7], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1000",
+ latencyBuckets[8], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_overflow",
+ latencyBuckets[9], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_count",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_sum",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(),
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_5", latencyBuckets[2],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_10", latencyBuckets[3],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_20", latencyBuckets[4],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_50", latencyBuckets[5],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_100", latencyBuckets[6],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_200", latencyBuckets[7],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_le_1000", latencyBuckets[8],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_overflow", latencyBuckets[9],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_count",
-
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(),
splitTopicAndPartitionIndexLabel);
long[] ledgerWriteLatencyBuckets =
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_0_5",
- ledgerWriteLatencyBuckets[0],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_1",
- ledgerWriteLatencyBuckets[1],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_5",
- ledgerWriteLatencyBuckets[2],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_10",
- ledgerWriteLatencyBuckets[3],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_20",
- ledgerWriteLatencyBuckets[4],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_50",
- ledgerWriteLatencyBuckets[5],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_100",
- ledgerWriteLatencyBuckets[6],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_200",
- ledgerWriteLatencyBuckets[7],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_le_1000",
- ledgerWriteLatencyBuckets[8],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_overflow",
- ledgerWriteLatencyBuckets[9],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_count",
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5",
+ ledgerWriteLatencyBuckets[0], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1",
+ ledgerWriteLatencyBuckets[1], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5",
+ ledgerWriteLatencyBuckets[2], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10",
+ ledgerWriteLatencyBuckets[3], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20",
+ ledgerWriteLatencyBuckets[4], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50",
+ ledgerWriteLatencyBuckets[5], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100",
+ ledgerWriteLatencyBuckets[6], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200",
+ ledgerWriteLatencyBuckets[7], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000",
+ ledgerWriteLatencyBuckets[8], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow",
+ ledgerWriteLatencyBuckets[9], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_storage_ledger_write_latency_sum",
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
- splitTopicAndPartitionIndexLabel);
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
long[] entrySizeBuckets =
stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128",
entrySizeBuckets[0],
+ writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0],
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512",
entrySizeBuckets[1],
+ writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1],
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb",
entrySizeBuckets[2],
+ writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb",
entrySizeBuckets[3],
+ writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb",
entrySizeBuckets[4],
+ writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
+ writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
+ writeMetric(stream, "pulsar_entry_size_le_100_kb",
entrySizeBuckets[6], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb",
entrySizeBuckets[7],
+ writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_entry_size_le_overflow", entrySizeBuckets[8],
+ writeMetric(stream, "pulsar_entry_size_le_overflow",
entrySizeBuckets[8], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
- stats.managedLedgerStats.entrySizeBuckets.getCount(),
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
- stats.managedLedgerStats.entrySizeBuckets.getSum(),
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_entry_size_count",
stats.managedLedgerStats.entrySizeBuckets.getCount(),
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_entry_size_sum",
stats.managedLedgerStats.entrySizeBuckets.getSum(),
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
stats.producerStats.forEach((p, producerStats) -> {
- metric(stream, cluster, namespace, topic, p,
producerStats.producerId, "pulsar_producer_msg_rate_in",
- producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, p,
producerStats.producerId, "pulsar_producer_msg_throughput_in",
- producerStats.msgThroughputIn,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, p,
producerStats.producerId, "pulsar_producer_msg_average_Size",
- producerStats.averageMsgSize,
splitTopicAndPartitionIndexLabel);
+ writeProducerMetric(stream, "pulsar_producer_msg_rate_in",
producerStats.msgRateIn,
+ cluster, namespace, topic, p, producerStats.producerId,
splitTopicAndPartitionIndexLabel);
+ writeProducerMetric(stream, "pulsar_producer_msg_throughput_in",
producerStats.msgThroughputIn,
+ cluster, namespace, topic, p, producerStats.producerId,
splitTopicAndPartitionIndexLabel);
+ writeProducerMetric(stream, "pulsar_producer_msg_average_Size",
producerStats.averageMsgSize,
+ cluster, namespace, topic, p, producerStats.producerId,
splitTopicAndPartitionIndexLabel);
});
- stats.subscriptionStats.forEach((n, subsStats) -> {
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_back_log",
- subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_back_log_no_delayed",
- subsStats.msgBacklogNoDelayed,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_delayed",
- subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_rate_redeliver",
- subsStats.msgRateRedeliver,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_unacked_messages",
- subsStats.unackedMessages,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_blocked_on_unacked_messages",
- subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_rate_out",
- subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_ack_rate",
- subsStats.messageAckRate,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_throughput_out",
- subsStats.msgThroughputOut,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_out_bytes_total",
- subsStats.bytesOutCounter,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_out_messages_total",
- subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_expire_timestamp",
- subsStats.lastExpireTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_acked_timestamp",
- subsStats.lastAckedTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_consumed_flow_timestamp",
- subsStats.lastConsumedFlowTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_consumed_timestamp",
- subsStats.lastConsumedTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_last_mark_delete_advanced_timestamp",
- subsStats.lastMarkDeleteAdvancedTimestamp,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_msg_rate_expired",
- subsStats.msgRateExpired,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n,
"pulsar_subscription_total_msg_expired",
- subsStats.totalMsgExpired,
splitTopicAndPartitionIndexLabel);
+ stats.subscriptionStats.forEach((sub, subsStats) -> {
+ writeSubscriptionMetric(stream, "pulsar_subscription_back_log",
subsStats.msgBacklog,
+ cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_back_log_no_delayed",
+ subsStats.msgBacklogNoDelayed, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
+ subsStats.msgDelayed, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_msg_rate_redeliver",
+ subsStats.msgRateRedeliver, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_unacked_messages",
+ subsStats.unackedMessages, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_blocked_on_unacked_messages",
+ subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
cluster, namespace, topic, sub,
+ splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_out",
+ subsStats.msgRateOut, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_msg_ack_rate",
+ subsStats.messageAckRate, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_msg_throughput_out",
+ subsStats.msgThroughputOut, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_out_bytes_total",
+ subsStats.bytesOutCounter, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_out_messages_total",
+ subsStats.msgOutCounter, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_last_expire_timestamp",
+ subsStats.lastExpireTimestamp, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_last_acked_timestamp",
+ subsStats.lastAckedTimestamp, cluster, namespace, topic,
sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_last_consumed_flow_timestamp",
+ subsStats.lastConsumedFlowTimestamp, cluster, namespace,
topic, sub,
+ splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_last_consumed_timestamp",
+ subsStats.lastConsumedTimestamp, cluster, namespace,
topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_last_mark_delete_advanced_timestamp",
+ subsStats.lastMarkDeleteAdvancedTimestamp, cluster,
namespace, topic, sub,
+ splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_msg_rate_expired",
+ subsStats.msgRateExpired, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream,
"pulsar_subscription_total_msg_expired",
+ subsStats.totalMsgExpired, cluster, namespace, topic, sub,
splitTopicAndPartitionIndexLabel);
+
subsStats.consumerStat.forEach((c, consumerStats) -> {
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_msg_rate_redeliver",
consumerStats.msgRateRedeliver,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_unacked_messages",
consumerStats.unackedMessages,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_blocked_on_unacked_messages",
+ writeConsumerMetric(stream,
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream,
"pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream,
"pulsar_consumer_blocked_on_unacked_messages",
consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_msg_rate_out",
consumerStats.msgRateOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_msg_throughput_out",
consumerStats.msgThroughputOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_consumer_available_permits",
consumerStats.availablePermits,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_out_bytes_total",
consumerStats.bytesOutCounter,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(),
c.consumerId(),
- "pulsar_out_messages_total",
consumerStats.msgOutCounter,
- splitTopicAndPartitionIndexLabel);
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_consumer_msg_rate_out",
consumerStats.msgRateOut,
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
+
+ writeConsumerMetric(stream, "pulsar_consumer_msg_ack_rate",
consumerStats.msgAckRate,
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
+
+ writeConsumerMetric(stream,
"pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream,
"pulsar_consumer_available_permits", consumerStats.availablePermits,
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_out_bytes_total",
consumerStats.bytesOutCounter,
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_out_messages_total",
consumerStats.msgOutCounter,
+ cluster, namespace, topic, sub, c,
splitTopicAndPartitionIndexLabel);
});
});
if (!stats.replicationStats.isEmpty()) {
stats.replicationStats.forEach((remoteCluster, replStats) -> {
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_rate_in", remoteCluster,
- replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_rate_out", remoteCluster,
- replStats.msgRateOut,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_throughput_in",
- remoteCluster, replStats.msgThroughputIn,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_throughput_out",
- remoteCluster, replStats.msgThroughputOut,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_backlog", remoteCluster,
- replStats.replicationBacklog,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_connected_count",
- remoteCluster, replStats.connectedCount,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_rate_expired",
- remoteCluster, replStats.msgRateExpired,
splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic,
"pulsar_replication_delay_in_seconds",
- remoteCluster, replStats.replicationDelayInSeconds,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_rate_in",
replStats.msgRateIn,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_rate_out",
replStats.msgRateOut,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_throughput_in",
replStats.msgThroughputIn,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_throughput_out",
replStats.msgThroughputOut,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_backlog",
replStats.replicationBacklog,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_connected_count",
replStats.connectedCount,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_rate_expired",
replStats.msgRateExpired,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_delay_in_seconds",
replStats.replicationDelayInSeconds,
+ cluster, namespace, topic, remoteCluster,
splitTopicAndPartitionIndexLabel);
});
}
- metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total",
stats.bytesInCounter,
+ writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter,
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_in_messages_total",
stats.msgInCounter,
+ writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter,
cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
// Compaction
boolean hasCompaction = compactorMXBean.flatMap(mxBean ->
mxBean.getCompactionRecordForTopic(topic))
- .map(__ -> true).orElse(false);
+ .isPresent();
if (hasCompaction) {
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_removed_event_count",
- stats.compactionRemovedEventCount,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_succeed_count",
- stats.compactionSucceedCount,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_failed_count",
- stats.compactionFailedCount,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_duration_time_in_mills",
- stats.compactionDurationTimeInMills,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_read_throughput",
- stats.compactionReadThroughput,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_write_throughput",
- stats.compactionWriteThroughput,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_compacted_entries_count",
- stats.compactionCompactedEntriesCount,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_compacted_entries_size",
- stats.compactionCompactedEntriesSize,
splitTopicAndPartitionIndexLabel);
- long[] compactionLatencyBuckets =
stats.compactionLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_0_5",
- compactionLatencyBuckets[0],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_1",
- compactionLatencyBuckets[1],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_5",
- compactionLatencyBuckets[2],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_10",
- compactionLatencyBuckets[3],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_20",
- compactionLatencyBuckets[4],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_50",
- compactionLatencyBuckets[5],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_100",
- compactionLatencyBuckets[6],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_200",
- compactionLatencyBuckets[7],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_le_1000",
- compactionLatencyBuckets[8],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_overflow",
- compactionLatencyBuckets[9],
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_sum",
- stats.compactionLatencyBuckets.getSum(),
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic,
"pulsar_compaction_latency_count",
- stats.compactionLatencyBuckets.getCount(),
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_removed_event_count",
+ stats.compactionRemovedEventCount, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_succeed_count",
+ stats.compactionSucceedCount, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_failed_count",
+ stats.compactionFailedCount, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_duration_time_in_mills",
+ stats.compactionDurationTimeInMills, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_read_throughput",
+ stats.compactionReadThroughput, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_write_throughput",
+ stats.compactionWriteThroughput, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_compacted_entries_count",
+ stats.compactionCompactedEntriesCount, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_compacted_entries_size",
+ stats.compactionCompactedEntriesSize, cluster, namespace,
topic, splitTopicAndPartitionIndexLabel);
+
+ long[] compactionBuckets =
stats.compactionLatencyBuckets.getBuckets();
+ writeMetric(stream, "pulsar_compaction_latency_le_0_5",
+ compactionBuckets[0], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_1",
+ compactionBuckets[1], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_5",
+ compactionBuckets[2], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_10",
+ compactionBuckets[3], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_20",
+ compactionBuckets[4], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_50",
+ compactionBuckets[5], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_100",
+ compactionBuckets[6], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_200",
+ compactionBuckets[7], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_1000",
+ compactionBuckets[8], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_overflow",
+ compactionBuckets[9], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_sum",
+ stats.compactionLatencyBuckets.getSum(), cluster,
namespace, topic,
+ splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_count",
+ stats.compactionLatencyBuckets.getCount(), cluster,
namespace, topic,
+ splitTopicAndPartitionIndexLabel);
}
}
- static void metricType(SimpleTextOutputStream stream, String name) {
-
- if (!metricWithTypeDefinition.containsKey(name)) {
- metricWithTypeDefinition.put(name, "gauge");
- stream.write("# TYPE ").write(name).write(" gauge\n");
- }
-
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String name, double value, boolean
splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeMetric(PrometheusMetricStreams stream, String
metricName, Number value, String cluster,
+ String namespace, String topic, boolean
splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String subscription, String name, long value, boolean
splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeMetric(PrometheusMetricStreams stream, String
metricName, Number value, String cluster,
+ String namespace, String topic, String
remoteCluster,
+ boolean splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel,
+ "remote_cluster", remoteCluster);
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String producerName, long produceId, String name, double value,
boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",producer_name=\"").write(producerName)
- .write("\",producer_id=\"").write(produceId).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeProducerMetric(PrometheusMetricStreams stream,
String metricName, Number value,
+ String cluster, String namespace,
String topic, String producer,
+ long producerId, boolean
splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel,
+ "producer_name", producer, "producer_id",
String.valueOf(producerId));
}
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String subscription, String name, double value, boolean
splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value);
- appendEndings(stream);
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String subscription, String consumerName, long consumerId, String
name, long value,
- boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription)
-
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
- .write("\"} ");
- stream.write(value);
- appendEndings(stream);
- }
- private static void metric(SimpleTextOutputStream stream, String cluster,
String namespace, String topic,
- String subscription, String consumerName, long consumerId, String
name, double value,
- boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription)
-
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
- .write(consumerId).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeSubscriptionMetric(PrometheusMetricStreams
stream, String metricName, Number value,
+ String cluster, String
namespace, String topic, String subscription,
+ boolean
splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel,
+ "subscription", subscription);
}
- private static void metricWithRemoteCluster(SimpleTextOutputStream stream,
String cluster, String namespace,
- String topic, String name, String remoteCluster, double value,
boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name,
splitTopicAndPartitionIndexLabel)
- .write("\",remote_cluster=\"").write(remoteCluster).write("\"}
");
- stream.write(value);
- appendEndings(stream);
+ private static void writeConsumerMetric(PrometheusMetricStreams stream,
String metricName, Number value,
+ String cluster, String namespace,
String topic, String subscription,
+ Consumer consumer, boolean
splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel,
+ "subscription", subscription, "consumer_name",
consumer.consumerName(),
+ "consumer_id", String.valueOf(consumer.consumerId()));
}
- private static SimpleTextOutputStream
appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
- String namespace, String topic, String name, boolean
splitTopicAndPartitionIndexLabel) {
-
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+ static void writeTopicMetric(PrometheusMetricStreams stream, String
metricName, Number value, String cluster,
+ String namespace, String topic, boolean
splitTopicAndPartitionIndexLabel,
+ String... extraLabelsAndValues) {
+ String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel
? 8 : 6];
+ labelsAndValues[0] = "cluster";
+ labelsAndValues[1] = cluster;
+ labelsAndValues[2] = "namespace";
+ labelsAndValues[3] = namespace;
+ labelsAndValues[4] = "topic";
if (splitTopicAndPartitionIndexLabel) {
int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX);
if (index > 0) {
- stream.write("\",topic=\"").write(topic.substring(0,
index)).write("\",partition=\"")
- .write(topic.substring(index +
PARTITIONED_TOPIC_SUFFIX.length()));
+ labelsAndValues[5] = topic.substring(0, index);
+ labelsAndValues[6] = "partition";
+ labelsAndValues[7] = topic.substring(index +
PARTITIONED_TOPIC_SUFFIX.length());
} else {
-
stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1");
+ labelsAndValues[5] = topic;
+ labelsAndValues[6] = "partition";
+ labelsAndValues[7] = "-1";
}
} else {
- stream.write("\",topic=\"").write(topic);
+ labelsAndValues[5] = topic;
}
- return stream;
- }
-
- private static void appendEndings(SimpleTextOutputStream stream) {
- stream.write(' ').write(System.currentTimeMillis()).write('\n');
+ String[] labels = ArrayUtils.addAll(labelsAndValues,
extraLabelsAndValues);
+ stream.writeSample(metricName, value, labels);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index e6ac1535f43..8c58b516333 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.broker.stats.prometheus;
import static
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import io.netty.util.concurrent.FastThreadLocal;
-import java.util.HashMap;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
@@ -30,7 +28,6 @@ import
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import
org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreStats;
@@ -38,21 +35,10 @@ import
org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreSt
@Slf4j
public class TransactionAggregator {
- /**
- * Used for tracking duplicate TYPE definitions.
- */
- private static final FastThreadLocal<Map<String, String>>
threadLocalMetricWithTypeDefinition =
- new FastThreadLocal() {
- @Override
- protected Map<String, String> initialValue() {
- return new HashMap<>();
- }
- };
-
private static final
FastThreadLocal<AggregatedTransactionCoordinatorStats>
localTransactionCoordinatorStats =
new FastThreadLocal<AggregatedTransactionCoordinatorStats>() {
@Override
- protected AggregatedTransactionCoordinatorStats initialValue()
throws Exception {
+ protected AggregatedTransactionCoordinatorStats initialValue()
{
return new AggregatedTransactionCoordinatorStats();
}
};
@@ -60,21 +46,18 @@ public class TransactionAggregator {
private static final FastThreadLocal<ManagedLedgerStats>
localManageLedgerStats =
new FastThreadLocal<ManagedLedgerStats>() {
@Override
- protected ManagedLedgerStats initialValue() throws Exception {
+ protected ManagedLedgerStats initialValue() {
return new ManagedLedgerStats();
}
};
- public static void generate(PulsarService pulsar, SimpleTextOutputStream
stream, boolean includeTopicMetrics) {
+ public static void generate(PulsarService pulsar, PrometheusMetricStreams
stream, boolean includeTopicMetrics) {
String cluster = pulsar.getConfiguration().getClusterName();
- Map<String, String> metricWithTypeDefinition =
threadLocalMetricWithTypeDefinition.get();
- metricWithTypeDefinition.clear();
if (includeTopicMetrics) {
-
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace,
bundlesMap) -> {
- bundlesMap.forEach((bundle, topicsMap) -> {
- topicsMap.forEach((name, topic) -> {
+
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace,
bundlesMap) ->
+ bundlesMap.forEach((bundle, topicsMap) ->
topicsMap.forEach((name, topic) -> {
if (topic instanceof PersistentTopic) {
topic.getSubscriptions().values().forEach(subscription -> {
try {
@@ -82,9 +65,8 @@ public class TransactionAggregator {
if
(!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
&& subscription instanceof
PersistentSubscription
&& ((PersistentSubscription)
subscription).checkIfPendingAckStoreInit()) {
- ManagedLedger managedLedger =
- ((PersistentSubscription)
subscription)
-
.getPendingAckManageLedger().get();
+ ManagedLedger managedLedger =
((PersistentSubscription) subscription)
+
.getPendingAckManageLedger().get();
generateManageLedgerStats(managedLedger,
stream, cluster, namespace,
name, subscription.getName());
}
@@ -93,9 +75,7 @@ public class TransactionAggregator {
}
});
}
- });
- });
- });
+ })));
}
AggregatedTransactionCoordinatorStats transactionCoordinatorStats =
localTransactionCoordinatorStats.get();
@@ -124,18 +104,18 @@ public class TransactionAggregator {
localManageLedgerStats.get().reset();
if (transactionMetadataStore instanceof
MLTransactionMetadataStore) {
- ManagedLedger managedLedger =
- ((MLTransactionMetadataStore)
transactionMetadataStore).getManagedLedger();
+ ManagedLedger managedLedger =
+ ((MLTransactionMetadataStore)
transactionMetadataStore).getManagedLedger();
generateManageLedgerStats(managedLedger,
stream, cluster,
NamespaceName.SYSTEM_NAMESPACE.toString(),
MLTransactionLogImpl.TRANSACTION_LOG_PREFIX +
transactionCoordinatorID.getId(),
MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME);
}
- });
+ });
}
- private static void generateManageLedgerStats(ManagedLedger managedLedger,
SimpleTextOutputStream stream,
+ private static void generateManageLedgerStats(ManagedLedger managedLedger,
PrometheusMetricStreams stream,
String cluster, String
namespace, String topic, String subscription) {
ManagedLedgerStats managedLedgerStats = localManageLedgerStats.get();
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl)
managedLedger.getStats();
@@ -157,174 +137,149 @@ public class TransactionAggregator {
managedLedgerStats.storageWriteRate =
mlStats.getAddEntryMessagesRate();
managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
- printManageLedgerStats(stream, cluster, namespace, topic,
- subscription, managedLedgerStats);
- }
-
- private static void metricType(SimpleTextOutputStream stream, String name)
{
- Map<String, String> metricWithTypeDefinition =
threadLocalMetricWithTypeDefinition.get();
- if (!metricWithTypeDefinition.containsKey(name)) {
- metricWithTypeDefinition.put(name, "gauge");
- stream.write("# TYPE ").write(name).write(" gauge\n");
- }
-
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster,
String name,
- double value, long coordinatorId) {
- metricType(stream, name);
- stream.write(name)
- .write("{cluster=\"").write(cluster)
- .write("\",coordinator_id=\"").write(coordinatorId).write("\"}
")
- .write(value).write(' ').write(System.currentTimeMillis())
- .write('\n');
+ printManageLedgerStats(stream, cluster, namespace, topic,
subscription, managedLedgerStats);
}
- private static void metrics(SimpleTextOutputStream stream, String cluster,
String namespace,
- String topic, String subscription, String
name, long value) {
- stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace)
-
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"}
");
- stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
- }
-
- private static void metrics(SimpleTextOutputStream stream, String cluster,
String namespace,
- String topic, String subscription, String
name, double value) {
- stream.write(name).write("{cluster=\"").write(cluster).write("\",
namespace=\"").write(namespace)
-
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"}
");
- stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
- }
-
- private static void printManageLedgerStats(SimpleTextOutputStream stream,
String cluster, String namespace,
+ private static void printManageLedgerStats(PrometheusMetricStreams stream,
String cluster, String namespace,
String topic, String
subscription, ManagedLedgerStats stats) {
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_size", stats.storageSize);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_logical_size", stats.storageLogicalSize);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_backlog_size", stats.backlogSize);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
+ writeMetric(stream, "pulsar_storage_size", stats.storageSize, cluster,
namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_logical_size",
stats.storageLogicalSize, cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_backlog_size", stats.backlogSize,
cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_offloaded_size",
stats.offloadedStorageUsed, cluster, namespace, topic,
+ subscription);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_rate", stats.storageWriteRate);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_read_rate", stats.storageReadRate);
+ writeMetric(stream, "pulsar_storage_write_rate",
stats.storageWriteRate, cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_read_rate", stats.storageReadRate,
cluster, namespace, topic,
+ subscription);
stats.storageWriteLatencyBuckets.refresh();
long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_storage_write_latency_count",
- stats.storageWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_storage_write_latency_sum",
- stats.storageWriteLatencyBuckets.getSum());
+ writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
latencyBuckets[0], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1",
latencyBuckets[1], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_5",
latencyBuckets[2], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_10",
latencyBuckets[3], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_20",
latencyBuckets[4], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_50",
latencyBuckets[5], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_100",
latencyBuckets[6], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_200",
latencyBuckets[7], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1000",
latencyBuckets[8], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_overflow",
latencyBuckets[9], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_count",
stats.storageWriteLatencyBuckets.getCount(),
+ cluster, namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_sum",
stats.storageWriteLatencyBuckets.getSum(), cluster,
+ namespace, topic, subscription);
stats.storageLedgerWriteLatencyBuckets.refresh();
- long[] ledgerWritelatencyBuckets =
stats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_0_5",
ledgerWritelatencyBuckets[0]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_1",
ledgerWritelatencyBuckets[1]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_5",
ledgerWritelatencyBuckets[2]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_10",
ledgerWritelatencyBuckets[3]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_20",
ledgerWritelatencyBuckets[4]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_50",
ledgerWritelatencyBuckets[5]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_100",
ledgerWritelatencyBuckets[6]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_200",
ledgerWritelatencyBuckets[7]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_1000",
ledgerWritelatencyBuckets[8]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_storage_ledger_write_latency_overflow",
- ledgerWritelatencyBuckets[9]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_storage_ledger_write_latency_count",
- stats.storageLedgerWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_storage_ledger_write_latency_sum",
- stats.storageLedgerWriteLatencyBuckets.getSum());
+ long[] ledgerWriteLatencyBuckets =
stats.storageLedgerWriteLatencyBuckets.getBuckets();
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5",
ledgerWriteLatencyBuckets[0], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1",
ledgerWriteLatencyBuckets[1], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5",
ledgerWriteLatencyBuckets[2], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10",
ledgerWriteLatencyBuckets[3], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20",
ledgerWriteLatencyBuckets[4], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50",
ledgerWriteLatencyBuckets[5], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100",
ledgerWriteLatencyBuckets[6], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200",
ledgerWriteLatencyBuckets[7], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000",
ledgerWriteLatencyBuckets[8], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow",
ledgerWriteLatencyBuckets[9], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
+ stats.storageLedgerWriteLatencyBuckets.getCount(), cluster,
namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
+ stats.storageLedgerWriteLatencyBuckets.getSum(), cluster,
namespace, topic, subscription);
stats.entrySizeBuckets.refresh();
long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_entry_size_le_128", entrySizeBuckets[0]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_entry_size_le_512", entrySizeBuckets[1]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
- metric(stream, cluster, namespace, topic, subscription,
"pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster,
- String namespace, String topic, String
subscription,
- String name, long value) {
-
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-
.write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"}
");
- stream.write(value).write('
').write(System.currentTimeMillis()).write('\n');
+ writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0],
cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1],
cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_100_kb",
entrySizeBuckets[6], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_overflow",
entrySizeBuckets[8], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_count",
stats.entrySizeBuckets.getCount(), cluster, namespace,
+ topic, subscription);
+ writeMetric(stream, "pulsar_entry_size_sum",
stats.entrySizeBuckets.getSum(), cluster, namespace, topic,
+ subscription);
}
- static void printTransactionCoordinatorStats(SimpleTextOutputStream
stream, String cluster,
+ static void printTransactionCoordinatorStats(PrometheusMetricStreams
stream, String cluster,
AggregatedTransactionCoordinatorStats stats,
long coordinatorId) {
- metric(stream, cluster, "pulsar_txn_active_count",
- stats.actives, coordinatorId);
- metric(stream, cluster, "pulsar_txn_committed_count",
- stats.committedCount, coordinatorId);
- metric(stream, cluster, "pulsar_txn_aborted_count",
- stats.abortedCount, coordinatorId);
- metric(stream, cluster, "pulsar_txn_created_count",
- stats.createdCount, coordinatorId);
- metric(stream, cluster, "pulsar_txn_timeout_count",
- stats.timeoutCount, coordinatorId);
- metric(stream, cluster, "pulsar_txn_append_log_count",
- stats.appendLogCount, coordinatorId);
+ writeMetric(stream, "pulsar_txn_active_count", stats.actives, cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_committed_count",
stats.committedCount, cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_aborted_count", stats.abortedCount,
cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_created_count", stats.createdCount,
cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_timeout_count", stats.timeoutCount,
cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_append_log_count",
stats.appendLogCount, cluster,
+ coordinatorId);
long[] latencyBuckets = stats.executionLatency;
- metric(stream, cluster, "pulsar_txn_execution_latency_le_10",
latencyBuckets[0], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_20",
latencyBuckets[1], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_50",
latencyBuckets[2], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_100",
latencyBuckets[3], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_500",
latencyBuckets[4], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_1000",
latencyBuckets[5], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_5000",
latencyBuckets[6], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_15000",
latencyBuckets[7], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_30000",
latencyBuckets[8], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_60000",
latencyBuckets[9], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_300000",
- latencyBuckets[10], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_1500000",
- latencyBuckets[11], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_3000000",
- latencyBuckets[12], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_overflow",
- latencyBuckets[13], coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_10",
latencyBuckets[0], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_20",
latencyBuckets[1], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_50",
latencyBuckets[2], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_100",
latencyBuckets[3], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_500",
latencyBuckets[4], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_1000",
latencyBuckets[5], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_5000",
latencyBuckets[6], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_15000",
latencyBuckets[7], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_30000",
latencyBuckets[8], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_60000",
latencyBuckets[9], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_300000",
latencyBuckets[10], cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_1500000",
latencyBuckets[11], cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_3000000",
latencyBuckets[12], cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_overflow",
latencyBuckets[13], cluster,
+ coordinatorId);
+ }
+
+ private static void writeMetric(PrometheusMetricStreams stream, String
metricName, double value, String cluster,
+ long coordinatorId) {
+ stream.writeSample(metricName, value, "cluster", cluster,
"coordinator_id", String.valueOf(coordinatorId));
+ }
+
+ private static void writeMetric(PrometheusMetricStreams stream, String
metricName, Number value, String cluster,
+ String namespace, String topic, String
subscription) {
+ stream.writeSample(metricName, value, "cluster", cluster, "namespace",
namespace, "topic", topic,
+ "subscription", subscription);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
index 7550096c2b5..8f704b11e76 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
@@ -18,13 +18,8 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;
-import io.prometheus.client.Collector;
-import io.prometheus.client.Collector.MetricFamilySamples;
-import io.prometheus.client.Collector.MetricFamilySamples.Sample;
-import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.Writer;
-import java.util.Enumeration;
import org.apache.bookkeeper.stats.Counter;
/**
@@ -140,31 +135,4 @@ public class PrometheusTextFormatUtil {
.append(success.toString()).append("\"} ")
.append(Double.toString(opStat.getSum(success))).append('\n');
}
-
- public static void writeMetricsCollectedByPrometheusClient(Writer w,
CollectorRegistry registry)
- throws IOException {
- Enumeration<MetricFamilySamples> metricFamilySamples =
registry.metricFamilySamples();
- while (metricFamilySamples.hasMoreElements()) {
- MetricFamilySamples metricFamily =
metricFamilySamples.nextElement();
-
- for (int i = 0; i < metricFamily.samples.size(); i++) {
- Sample sample = metricFamily.samples.get(i);
- w.write(sample.name);
- w.write('{');
- for (int j = 0; j < sample.labelNames.size(); j++) {
- if (j != 0) {
- w.write(", ");
- }
- w.write(sample.labelNames.get(j));
- w.write("=\"");
- w.write(sample.labelValues.get(j));
- w.write('"');
- }
-
- w.write("} ");
- w.write(Collector.doubleToGoString(sample.value));
- w.write('\n');
- }
- }
- }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index f28412ea751..be10c8dc65e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -50,6 +50,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@@ -1404,6 +1405,63 @@ public class PrometheusMetricsTest extends
BrokerTestBase {
assertEquals(cm.get(0).value, count);
}
+ @Test
+ public void testMetricsGroupedByTypeDefinitions() throws Exception {
+ Producer<byte[]> p1 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+ Producer<byte[]> p2 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false,
statsOut);
+ String metricsStr = statsOut.toString();
+
+ Pattern typePattern =
Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
+ Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
+
+ AtomicReference<String> currentMetric = new AtomicReference<>();
+ Splitter.on("\n").split(metricsStr).forEach(line -> {
+ if (line.isEmpty()) {
+ return;
+ }
+ if (line.startsWith("#")) {
+ // Get the current type definition
+ Matcher typeMatcher = typePattern.matcher(line);
+ checkArgument(typeMatcher.matches());
+ String metricName = typeMatcher.group(1);
+ currentMetric.set(metricName);
+ } else {
+ Matcher metricMatcher = metricNamePattern.matcher(line);
+ checkArgument(metricMatcher.matches());
+ String metricName = metricMatcher.group(1);
+
+ if (metricName.endsWith("_bucket")) {
+ metricName = metricName.substring(0,
metricName.indexOf("_bucket"));
+ } else if (metricName.endsWith("_count") &&
!currentMetric.get().endsWith("_count")) {
+ metricName = metricName.substring(0,
metricName.indexOf("_count"));
+ } else if (metricName.endsWith("_sum") &&
!currentMetric.get().endsWith("_sum")) {
+ metricName = metricName.substring(0,
metricName.indexOf("_sum"));
+ } else if (metricName.endsWith("_total") &&
!currentMetric.get().endsWith("_total")) {
+ metricName = metricName.substring(0,
metricName.indexOf("_total"));
+ } else if (metricName.endsWith("_created") &&
!currentMetric.get().endsWith("_created")) {
+ metricName = metricName.substring(0,
metricName.indexOf("_created"));
+ }
+
+ if (!metricName.equals(currentMetric.get())) {
+ System.out.println(metricsStr);
+ fail("Metric not grouped under its type definition: " +
line);
+ }
+
+ }
+ });
+
+ p1.close();
+ p2.close();
+ }
+
/**
* Hacky parsing of Prometheus text format. Should be good enough for unit
tests
*/
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
new file mode 100644
index 00000000000..15c29a0dc66
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class PrometheusMetricStreamsTest {
+
+ private PrometheusMetricStreams underTest;
+
+ @BeforeMethod(alwaysRun = true)
+ protected void setup() throws Exception {
+ underTest = new PrometheusMetricStreams();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ underTest.releaseAll();
+ }
+
+ @Test
+ public void canWriteSampleWithoutLabels() {
+ underTest.writeSample("my-metric", 123);
+
+ String actual = writeToString();
+
+ assertTrue(actual.startsWith("# TYPE my-metric gauge"), "Gauge type
line missing");
+ assertTrue(actual.contains("my-metric{} 123"), "Metric line missing");
+ }
+
+ @Test
+ public void canWriteSampleWithLabels() {
+ underTest.writeSample("my-other-metric", 123, "cluster", "local");
+ underTest.writeSample("my-other-metric", 456, "cluster", "local",
"namespace", "my-ns");
+
+ String actual = writeToString();
+
+ assertTrue(actual.startsWith("# TYPE my-other-metric gauge"), "Gauge
type line missing");
+ assertTrue(actual.contains("my-other-metric{cluster=\"local\"} 123"),
"Cluster metric line missing");
+
assertTrue(actual.contains("my-other-metric{cluster=\"local\",namespace=\"my-ns\"}
456"),
+ "Cluster and Namespace metric line missing");
+ }
+
+ private String writeToString() {
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
+ try {
+ SimpleTextOutputStream stream = new SimpleTextOutputStream(buffer);
+ underTest.flushAllToStream(stream);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int readIndex = buffer.readerIndex();
+ int readableBytes = buffer.readableBytes();
+ for (int i = 0; i < readableBytes; i++) {
+ out.write(buffer.getByte(readIndex + i));
+ }
+ return out.toString();
+ } finally {
+ buffer.release();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index 9fc4b347c85..dd78b4cfe58 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -22,12 +22,11 @@ import io.netty.buffer.ByteBuf;
/**
* Format strings and numbers into a ByteBuf without any memory allocation.
- *
*/
public class SimpleTextOutputStream {
private final ByteBuf buffer;
- private static final char[] hexChars = { '0', '1', '2', '3', '4', '5',
'6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
- 'f' };
+ private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6',
'7', '8', '9', 'a', 'b', 'c', 'd', 'e',
+ 'f'};
public SimpleTextOutputStream(ByteBuf buffer) {
this.buffer = buffer;
@@ -131,4 +130,12 @@ public class SimpleTextOutputStream {
write(r);
return this;
}
+
+ public void write(ByteBuf byteBuf) {
+ buffer.writeBytes(byteBuf);
+ }
+
+ public ByteBuf getBuffer() {
+ return buffer;
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
index f7a205c7db0..f5aa273f656 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
@@ -37,6 +37,11 @@ public class PrometheusTextFormat {
*/
while (mfs.hasMoreElements()) {
Collector.MetricFamilySamples metricFamilySamples =
mfs.nextElement();
+ writer.write("# TYPE ");
+ writer.write(metricFamilySamples.name);
+ writer.write(' ');
+ writer.write(metricFamilySamples.type.name().toLowerCase());
+ writer.write('\n');
for (Collector.MetricFamilySamples.Sample sample :
metricFamilySamples.samples) {
writer.write(sample.name);
if (sample.labelNames.size() > 0) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index c8b411cbf57..2ad407b2e5e 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -328,6 +328,11 @@ public class WorkerStatsManager {
}
private void writeMetric(String metricName, long value, StringWriter stream)
{
+ stream.write("# TYPE ");
+ stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
+ stream.write(metricName);
+ stream.write(" gauge \n");
+
stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
stream.write(metricName);
stream.write("{");