This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a6eb62286fdc4fb0e6f00d94305ae604790d5cab Author: Tao Jiuming <95597048+tjium...@users.noreply.github.com> AuthorDate: Tue Apr 19 21:26:18 2022 +0800 [enh][monitor]: add metrics for pulsar web service thread pool (#14742) Fixes https://github.com/apache/pulsar/issues/14459 See the issue 1. Add WebExecutorStats to record web thread pool metrics (cherry picked from commit 32d7a51936aac72a1b22d5ed1e41f1658a6c618c) --- .../PrometheusMetricsGeneratorUtils.java | 4 +- .../apache/pulsar/broker/web/WebExecutorStats.java | 100 +++++++++++++++++++++ .../org/apache/pulsar/broker/web/WebService.java | 3 + .../apache/pulsar/broker/web/WebServiceTest.java | 42 +++++++++ 4 files changed, 148 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java index 0e298151971..ead3c332b2b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java @@ -65,7 +65,9 @@ public class PrometheusMetricsGeneratorUtils { for (int i = 0; i < metricFamily.samples.size(); i++) { Collector.MetricFamilySamples.Sample sample = metricFamily.samples.get(i); stream.write(sample.name); - stream.write("{cluster=\"").write(cluster).write('"'); + if (!sample.labelNames.contains("cluster")) { + stream.write("{cluster=\"").write(cluster).write('"'); + } for (int j = 0; j < sample.labelNames.size(); j++) { String labelValue = sample.labelValues.get(j); if (labelValue != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java new file mode 100644 index 00000000000..1c89318305b --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Gauge; +import java.util.concurrent.atomic.AtomicBoolean; + +class WebExecutorStats implements AutoCloseable { + private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + + private final Gauge maxThreads; + private final Gauge minThreads; + private final Gauge idleThreads; + private final Gauge activeThreads; + private final Gauge currentThreads; + private final WebExecutorThreadPool executor; + + private static volatile WebExecutorStats instance; + + static synchronized WebExecutorStats getStats(WebExecutorThreadPool executor) { + if (null == instance) { + instance = new WebExecutorStats(executor); + } + + return instance; + } + + private WebExecutorStats(WebExecutorThreadPool executor) { + this.executor = executor; + + this.maxThreads = Gauge.build("pulsar_web_executor_max_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getMaxThreads(); + } + }) + .register(); + + this.minThreads = Gauge.build("pulsar_web_executor_min_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getMinThreads(); + } + }) + .register(); + + this.idleThreads = Gauge.build("pulsar_web_executor_idle_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getIdleThreads(); + } + }) + .register(); + + this.activeThreads = Gauge.build("pulsar_web_executor_active_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getThreads() + - WebExecutorStats.this.executor.getIdleThreads(); + } + }) + .register(); + + this.currentThreads = Gauge.build("pulsar_web_executor_current_threads", "-").create() + .setChild(new Gauge.Child() { + public double get() { + return WebExecutorStats.this.executor.getThreads(); + } + }) + .register(); + } + + @Override + public void close() throws Exception { + if (CLOSED.compareAndSet(false, true)) { + CollectorRegistry.defaultRegistry.unregister(this.activeThreads); + CollectorRegistry.defaultRegistry.unregister(this.maxThreads); + CollectorRegistry.defaultRegistry.unregister(this.minThreads); + CollectorRegistry.defaultRegistry.unregister(this.idleThreads); + CollectorRegistry.defaultRegistry.unregister(this.currentThreads); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 7e6b1636a5c..fc800880b83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -66,6 +66,7 @@ public class WebService implements AutoCloseable { private final PulsarService pulsar; private final Server server; private final List<Handler> handlers; + private final WebExecutorStats executorStats; private final WebExecutorThreadPool webServiceExecutor; public final int maxConcurrentRequests; @@ -79,6 +80,7 @@ public class WebService implements AutoCloseable { this.webServiceExecutor = new WebExecutorThreadPool( pulsar.getConfiguration().getNumHttpServerThreads(), "pulsar-web"); + this.executorStats = WebExecutorStats.getStats(webServiceExecutor); this.server = new Server(webServiceExecutor); this.maxConcurrentRequests = pulsar.getConfiguration().getMaxConcurrentHttpRequests(); List<ServerConnector> connectors = new ArrayList<>(); @@ -273,6 +275,7 @@ public class WebService implements AutoCloseable { jettyStatisticsCollector = null; } webServiceExecutor.join(); + this.executorStats.close(); log.info("Web service closed"); } catch (Exception e) { throw new PulsarServerException(e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index ae611d7e4eb..0cd72f19a4d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -23,10 +23,12 @@ import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.io.CharStreams; import com.google.common.io.Closeables; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -35,6 +37,7 @@ import java.security.KeyStore; import java.security.PrivateKey; import java.security.SecureRandom; import java.security.cert.Certificate; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -52,6 +55,8 @@ import org.apache.pulsar.broker.MockedBookKeeperClientFactory; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.stats.PrometheusMetricsTest; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -88,6 +93,43 @@ public class WebServiceTest { private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt"; private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; + + @Test + public void testWebExecutorMetrics() throws Exception { + setupEnv(true, "1.0", true, false, false, false, -1, false); + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr); + + Collection<PrometheusMetricsTest.Metric> maxThreads = metrics.get("pulsar_web_executor_max_threads"); + Collection<PrometheusMetricsTest.Metric> minThreads = metrics.get("pulsar_web_executor_min_threads"); + Collection<PrometheusMetricsTest.Metric> activeThreads = metrics.get("pulsar_web_executor_active_threads"); + Collection<PrometheusMetricsTest.Metric> idleThreads = metrics.get("pulsar_web_executor_idle_threads"); + Collection<PrometheusMetricsTest.Metric> currentThreads = metrics.get("pulsar_web_executor_current_threads"); + + for (PrometheusMetricsTest.Metric metric : maxThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value > 0); + } + for (PrometheusMetricsTest.Metric metric : minThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value > 0); + } + for (PrometheusMetricsTest.Metric metric : activeThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value >= 0); + } + for (PrometheusMetricsTest.Metric metric : idleThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value >= 0); + } + for (PrometheusMetricsTest.Metric metric : currentThreads) { + Assert.assertNotNull(metric.tags.get("cluster")); + Assert.assertTrue(metric.value > 0); + } + } + /** * Test that the {@WebService} class properly passes the allowUnversionedClients value. We do this by setting * allowUnversionedClients to true, then making a request with no version, which should go through.