sijie closed pull request #2679: Add types comment in Prometheus stats URL: https://github.com/apache/pulsar/pull/2679
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index d07bc386ad..36343cfe30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -24,9 +24,12 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.eclipse.jetty.util.ConcurrentHashSet; import io.netty.util.concurrent.FastThreadLocal; +import java.util.Set; + public class NamespaceStatsAggregator { private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() { @@ -43,6 +46,8 @@ protected TopicStats initialValue() throws Exception { } }; + private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>(); + public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) { String cluster = pulsar.getConfiguration().getClusterName(); AggregatedNamespaceStats namespaceStats = localNamespaceStats.get(); @@ -51,6 +56,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { namespaceStats.reset(); + METRIC_TYPES.forEach(metric -> TopicStats.metricType(stream, metric)); + bundlesMap.forEach((bundle, topicsMap) -> { topicsMap.forEach((name, topic) -> { getTopicStats(topic, topicStats, includeConsumerMetrics); @@ -218,20 +225,35 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, long value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\"} "); + if (!METRIC_TYPES.contains(name)) { + TopicStats.metricType(stream, name); + METRIC_TYPES.add(name); + } + + stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name, double value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\"} "); + if (!METRIC_TYPES.contains(name)) { + TopicStats.metricType(stream, name); + METRIC_TYPES.add(name); + } + + stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace, String name, String remoteCluster, double value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace); - stream.write("\", remote_cluster=\"").write(remoteCluster).write("\"} "); + if (!METRIC_TYPES.contains(name)) { + TopicStats.metricType(stream, name); + METRIC_TYPES.add(name); + } + + stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace); + stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 5c42b51328..e346d27c29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -86,12 +86,16 @@ private static void generateSystemMetrics(SimpleTextOutputStream stream, String while (metricFamilySamples.hasMoreElements()) { MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); + // Write type of metric + stream.write("# TYPE ").write(metricFamily.name).write(' ') + .write(getTypeStr(metricFamily.type)).write('\n'); + for (int i = 0; i < metricFamily.samples.size(); i++) { Sample sample = metricFamily.samples.get(i); stream.write(sample.name); stream.write("{cluster=\"").write(cluster).write('"'); for (int j = 0; j < sample.labelNames.size(); j++) { - stream.write(", "); + stream.write(","); stream.write(sample.labelNames.get(j)); stream.write("=\""); stream.write(sample.labelValues.get(j)); @@ -104,4 +108,21 @@ private static void generateSystemMetrics(SimpleTextOutputStream stream, String } } } + + static String getTypeStr(Collector.Type type) { + switch (type) { + case COUNTER: + return "counter"; + case GAUGE: + return "gauge"; + case SUMMARY : + return "summary"; + case HISTOGRAM: + return "histogram"; + case UNTYPED: + default: + return "untyped"; + } + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index c592b8f49b..ff946b95fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -18,12 +18,14 @@ */ package org.apache.pulsar.broker.stats.prometheus; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.pulsar.common.util.SimpleTextOutputStream; - -import java.util.HashMap; -import java.util.Map; +import org.eclipse.jetty.util.ConcurrentHashSet; class TopicStats { @@ -45,6 +47,7 @@ Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>(); Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>(); + private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>(); public void reset() { subscriptionsCount = 0; @@ -69,6 +72,8 @@ public void reset() { static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic, TopicStats stats) { + METRIC_TYPES.forEach(metric -> metricType(stream, metric)); + metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount); metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount); metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount); @@ -129,40 +134,69 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin } + static void metricType(SimpleTextOutputStream stream, String name) { + stream.write("# TYPE ").write(name).write(" gauge\n"); + } + private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String name, double value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\", topic=\"").write(topic).write("\"} "); + if (!METRIC_TYPES.contains(name)) { + metricType(stream, name); + METRIC_TYPES.add(name); + } + + stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) + .write("\",topic=\"").write(topic).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String name, long value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription).write("\"} "); + if (!METRIC_TYPES.contains(name)) { + metricType(stream, name); + METRIC_TYPES.add(name); + } + + stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) + .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String name, double value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription).write("\"} "); + if (!METRIC_TYPES.contains(name)) { + metricType(stream, name); + METRIC_TYPES.add(name); + } + + stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) + .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String consumerName, long consumerId, String name, long value) { + if (!METRIC_TYPES.contains(name)) { + metricType(stream, name); + METRIC_TYPES.add(name); + } + stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription) - .write("\", consumer_name=\"").write(consumerName).write("\", consumer_id=\"").write(consumerId).write("\"} "); + .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription) + .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription, String consumerName, long consumerId, String name, double value) { - stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription) - .write("\", consumer_name=\"").write(consumerName).write("\", consumer_id=\"").write(consumerId).write("\"} "); + if (!METRIC_TYPES.contains(name)) { + metricType(stream, name); + METRIC_TYPES.add(name); + } + + stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) + .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription) + .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 6ad8d6d5ce..91bc0b826b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -150,7 +150,7 @@ public void testPerNamespaceStats() throws Exception { Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); Splitter.on("\n").split(metrics).forEach(line -> { - if (line.isEmpty()) { + if (line.isEmpty() || line.startsWith("#")) { return; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index 745f5d7297..907afaf198 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -21,11 +21,13 @@ import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; /** @@ -35,8 +37,12 @@ private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class); + private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>(); + public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) { if (workerService != null) { + METRIC_TYPES.forEach(metric -> metricType(out, metric)); + Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos(); @@ -86,10 +92,19 @@ public static void generate(WorkerService workerService, String cluster, SimpleT } } + private static void metricType(SimpleTextOutputStream stream, String name) { + stream.write("# TYPE ").write(name).write(" gauge\n"); + } + private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String functionName, String metricName, int instanceId, double value) { - stream.write(metricName).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace) - .write("\", name=\"").write(functionName).write("\", instanceId=\"").write(instanceId).write("\"} "); + if (!METRIC_TYPES.contains(metricName)) { + metricType(stream, metricName); + METRIC_TYPES.add(metricName); + } + + stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace) + .write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"} "); stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n'); } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java index d320514ed5..98168229bf 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -168,7 +168,7 @@ public void testFunctionsStatsGenerate() { Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); Arrays.asList(metrics.split("\n")).forEach(line -> { - if (line.isEmpty()) { + if (line.isEmpty() || line.startsWith("#")) { return; } Matcher matcher = pattern.matcher(line); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services