This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f682e105334bbae2fb644217f100f3c02978a661 Author: Tao Jiuming <[email protected]> AuthorDate: Tue Apr 19 21:26:18 2022 +0800 [enh][monitor]: add metrics for pulsar web service thread pool (#14742) Fixes https://github.com/apache/pulsar/issues/14459 See the issue 1. Add WebExecutorStats to record web thread pool metrics (cherry picked from commit 32d7a51936aac72a1b22d5ed1e41f1658a6c618c) --- .../PrometheusMetricsGeneratorUtils.java | 85 ++++++++++++++++++ .../apache/pulsar/broker/web/WebExecutorStats.java | 100 +++++++++++++++++++++ .../org/apache/pulsar/broker/web/WebService.java | 3 + .../pulsar/broker/stats/PrometheusMetricsTest.java | 6 +- .../apache/pulsar/broker/web/WebServiceTest.java | 44 ++++++++- 5 files changed, 233 insertions(+), 5 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java new file mode 100644 index 00000000000..399b9b826bc --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import java.util.Enumeration; +import org.apache.pulsar.common.util.SimpleTextOutputStream; + +/** + * Generate metrics in a text format suitable to be consumed by Prometheus. + * Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/} + */ +public class PrometheusMetricsGeneratorUtils { + + public static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) { + Enumeration<Collector.MetricFamilySamples> metricFamilySamples = + CollectorRegistry.defaultRegistry.metricFamilySamples(); + while (metricFamilySamples.hasMoreElements()) { + Collector.MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); + + // Write type of metric + stream.write("# TYPE ").write(metricFamily.name).write(' ') + .write(getTypeStr(metricFamily.type)).write('\n'); + + for (int i = 0; i < metricFamily.samples.size(); i++) { + Collector.MetricFamilySamples.Sample sample = metricFamily.samples.get(i); + stream.write(sample.name); + if (!sample.labelNames.contains("cluster")) { + stream.write("{cluster=\"").write(cluster).write('"'); + } + for (int j = 0; j < sample.labelNames.size(); j++) { + String labelValue = sample.labelValues.get(j); + if (labelValue != null) { + labelValue = labelValue.replace("\"", "\\\""); + } + + stream.write(","); + stream.write(sample.labelNames.get(j)); + stream.write("=\""); + stream.write(labelValue); + stream.write('"'); + } + + stream.write("} "); + stream.write(Collector.doubleToGoString(sample.value)); + stream.write('\n'); + } + } + } + + static String getTypeStr(Collector.Type type) { + switch (type) { + case COUNTER: + return "counter"; + case GAUGE: + return "gauge"; + case SUMMARY : + return "summary"; + case HISTOGRAM: + return "histogram"; + case UNTYPED: + default: + return "untyped"; + } + } + +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java new file mode 100644 index 00000000000..1c89318305b --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Gauge; +import java.util.concurrent.atomic.AtomicBoolean; + +class WebExecutorStats implements AutoCloseable { + private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + + private final Gauge maxThreads; + private final Gauge minThreads; + private final Gauge idleThreads; + private final Gauge activeThreads; + private final Gauge currentThreads; + private final WebExecutorThreadPool executor; + + private static volatile WebExecutorStats instance; + + static synchronized WebExecutorStats getStats(WebExecutorThreadPool executor) { + if (null == instance) { + instance = new WebExecutorStats(executor); + } + + return instance; + } + + private WebExecutorStats(WebExecutorThreadPool executor) { + this.executor = executor; + + this.maxThreads = Gauge.build("pulsar_web_executor_max_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getMaxThreads(); + } + }) + .register(); + + this.minThreads = Gauge.build("pulsar_web_executor_min_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getMinThreads(); + } + }) + .register(); + + this.idleThreads = Gauge.build("pulsar_web_executor_idle_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getIdleThreads(); + } + }) + .register(); + + this.activeThreads = Gauge.build("pulsar_web_executor_active_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getThreads() + - WebExecutorStats.this.executor.getIdleThreads(); + } + }) + .register(); + + this.currentThreads = Gauge.build("pulsar_web_executor_current_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getThreads(); + } + }) + .register(); + } + + @Override + public void close() throws Exception { + if (CLOSED.compareAndSet(false, true)) { + CollectorRegistry.defaultRegistry.unregister(this.activeThreads); + CollectorRegistry.defaultRegistry.unregister(this.maxThreads); + CollectorRegistry.defaultRegistry.unregister(this.minThreads); + CollectorRegistry.defaultRegistry.unregister(this.idleThreads); + CollectorRegistry.defaultRegistry.unregister(this.currentThreads); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 9033c50e933..ae33f368b85 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -67,6 +67,7 @@ public class WebService implements AutoCloseable { private final PulsarService pulsar; private final Server server; private final List<Handler> handlers; + private final WebExecutorStats executorStats; private final WebExecutorThreadPool webServiceExecutor; public final int maxConcurrentRequests; @@ -80,6 +81,7 @@ public class WebService implements AutoCloseable { this.webServiceExecutor = new WebExecutorThreadPool( pulsar.getConfiguration().getNumHttpServerThreads(), "pulsar-web"); + this.executorStats = WebExecutorStats.getStats(webServiceExecutor); this.server = new Server(webServiceExecutor); this.maxConcurrentRequests = pulsar.getConfiguration().getMaxConcurrentHttpRequests(); List<ServerConnector> connectors = new ArrayList<>(); @@ -281,6 +283,7 @@ public class WebService implements AutoCloseable { jettyStatisticsCollector = null; } webServiceExecutor.join(); + this.executorStats.close(); log.info("Web service closed"); } catch (Exception e) { throw new PulsarServerException(e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 9f098935efa..68dde0aaa26 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1317,9 +1317,9 @@ public class PrometheusMetricsTest extends BrokerTestBase { return parsed; } - static class Metric { - Map<String, String> tags = new TreeMap<>(); - double value; + public static class Metric { + public Map<String, String> tags = new TreeMap<>(); + public double value; @Override public String toString() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 3dacdebed38..1250dec684e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -24,12 +24,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.io.CharStreams; import com.google.common.io.Closeables; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; - +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -38,6 +39,7 @@ import java.security.KeyStore; import java.security.PrivateKey; import java.security.SecureRandom; import java.security.cert.Certificate; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -59,6 +61,8 @@ import org.apache.pulsar.broker.MockedBookKeeperClientFactory; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.stats.PrometheusMetricsTest; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -66,7 +70,6 @@ import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -99,6 +102,43 @@ public class WebServiceTest { private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt"; private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; + + @Test + public void testWebExecutorMetrics() throws Exception { + setupEnv(true, "1.0", true, false, false, false, -1, false); + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr); + + Collection<PrometheusMetricsTest.Metric> maxThreads = metrics.get("pulsar_web_executor_max_threads"); + Collection<PrometheusMetricsTest.Metric> minThreads = metrics.get("pulsar_web_executor_min_threads"); + Collection<PrometheusMetricsTest.Metric> activeThreads = metrics.get("pulsar_web_executor_active_threads"); + Collection<PrometheusMetricsTest.Metric> idleThreads = metrics.get("pulsar_web_executor_idle_threads"); + Collection<PrometheusMetricsTest.Metric> currentThreads = metrics.get("pulsar_web_executor_current_threads"); + + for (PrometheusMetricsTest.Metric metric : maxThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value > 0); + } + for (PrometheusMetricsTest.Metric metric : minThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value > 0); + } + for (PrometheusMetricsTest.Metric metric : activeThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value >= 0); + } + for (PrometheusMetricsTest.Metric metric : idleThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value >= 0); + } + for (PrometheusMetricsTest.Metric metric : currentThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value > 0); + } + } + /** * Test that the {@WebService} class properly passes the allowUnversionedClients value. We do this by setting * allowUnversionedClients to true, then making a request with no version, which should go through.
