This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b2f54ba385b49779151baff0874dce41faaa3ca4 Author: Christophe Bornet <[email protected]> AuthorDate: Wed Mar 16 11:58:20 2022 +0100 Add support of PrometheusRawMetricsProvider for the Pulsar-Proxy (#14681) (cherry picked from commit ea95b28336acee8fe42a5dd8d95c92500dad08ea) --- .../PrometheusMetricsGeneratorUtils.java | 106 ++++++++++++ .../stats/prometheus/PrometheusMetricsServlet.java | 41 ++--- .../prometheus/PrometheusRawMetricsProvider.java | 0 .../broker/stats/prometheus/package-info.java | 14 -- .../org/apache/pulsar/broker/PulsarService.java | 37 +++-- .../prometheus/PrometheusMetricsGenerator.java | 54 +----- .../prometheus/PulsarPrometheusMetricsServlet.java | 52 ++++++ .../apache/pulsar/proxy/server/ProxyService.java | 37 +++++ .../pulsar/proxy/server/ProxyServiceStarter.java | 11 +- .../proxy/server/ProxyPrometheusMetricsTest.java | 181 +++++++++++++++++++++ 10 files changed, 425 insertions(+), 108 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java new file mode 100644 index 00000000000..0e298151971 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Enumeration; +import java.util.List; +import org.apache.pulsar.common.util.SimpleTextOutputStream; + +/** + * Generate metrics in a text format suitable to be consumed by Prometheus. + * Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/} + */ +public class PrometheusMetricsGeneratorUtils { + + public static void generate(String cluster, OutputStream out, + List<PrometheusRawMetricsProvider> metricsProviders) + throws IOException { + ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); + try { + SimpleTextOutputStream stream = new SimpleTextOutputStream(buf); + generateSystemMetrics(stream, cluster); + if (metricsProviders != null) { + for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) { + metricsProvider.generate(stream); + } + } + out.write(buf.array(), buf.arrayOffset(), buf.readableBytes()); + } finally { + buf.release(); + } + } + + public static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) { + Enumeration<Collector.MetricFamilySamples> metricFamilySamples = + CollectorRegistry.defaultRegistry.metricFamilySamples(); + while (metricFamilySamples.hasMoreElements()) { + Collector.MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); + + // Write type of metric + stream.write("# TYPE ").write(metricFamily.name).write(' ') + .write(getTypeStr(metricFamily.type)).write('\n'); + + for (int i = 0; i < metricFamily.samples.size(); i++) { + Collector.MetricFamilySamples.Sample sample = metricFamily.samples.get(i); + stream.write(sample.name); + stream.write("{cluster=\"").write(cluster).write('"'); + for (int j = 0; j < sample.labelNames.size(); j++) { + String labelValue = sample.labelValues.get(j); + if (labelValue != null) { + labelValue = labelValue.replace("\"", "\\\""); + } + + stream.write(","); + stream.write(sample.labelNames.get(j)); + stream.write("=\""); + stream.write(labelValue); + stream.write('"'); + } + + stream.write("} "); + stream.write(Collector.doubleToGoString(sample.value)); + stream.write('\n'); + } + } + } + + static String getTypeStr(Collector.Type type) { + switch (type) { + case COUNTER: + return "counter"; + case GAUGE: + return "gauge"; + case SUMMARY : + return "summary"; + case HISTOGRAM: + return "histogram"; + case UNTYPED: + default: + return "untyped"; + } + } + +} + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java similarity index 71% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java rename to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 722c96c28a7..1c3277d5ee4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.stats.prometheus; -import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import static org.apache.bookkeeper.util.SafeRunnable.safeRun; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.EOFException; import java.io.IOException; @@ -28,36 +28,28 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.servlet.AsyncContext; import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.pulsar.broker.PulsarService; -import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrometheusMetricsServlet extends HttpServlet { private static final long serialVersionUID = 1L; + private static final int HTTP_STATUS_OK_200 = 200; + private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500; - private final PulsarService pulsar; - private final boolean shouldExportTopicMetrics; - private final boolean shouldExportConsumerMetrics; - private final boolean shouldExportProducerMetrics; private final long metricsServletTimeoutMs; - private final boolean splitTopicAndPartitionLabel; - private List<PrometheusRawMetricsProvider> metricsProviders; + private final String cluster; + protected List<PrometheusRawMetricsProvider> metricsProviders; private ExecutorService executor = null; - public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, - boolean shouldExportProducerMetrics, boolean splitTopicAndPartitionLabel) { - this.pulsar = pulsar; - this.shouldExportTopicMetrics = includeTopicMetrics; - this.shouldExportConsumerMetrics = includeConsumerMetrics; - this.shouldExportProducerMetrics = shouldExportProducerMetrics; - this.metricsServletTimeoutMs = pulsar.getConfiguration().getMetricsServletTimeoutMs(); - this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel; + public PrometheusMetricsServlet(long metricsServletTimeoutMs, String cluster) { + this.metricsServletTimeoutMs = metricsServletTimeoutMs; + this.cluster = cluster; } @Override @@ -66,19 +58,16 @@ public class PrometheusMetricsServlet extends HttpServlet { } @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) - throws ServletException, IOException { + protected void doGet(HttpServletRequest request, HttpServletResponse response) { AsyncContext context = request.startAsync(); context.setTimeout(metricsServletTimeoutMs); executor.execute(safeRun(() -> { long start = System.currentTimeMillis(); HttpServletResponse res = (HttpServletResponse) context.getResponse(); try { - res.setStatus(HttpStatus.OK_200); + res.setStatus(HTTP_STATUS_OK_200); res.setContentType("text/plain"); - PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, - shouldExportProducerMetrics, splitTopicAndPartitionLabel, res.getOutputStream(), - metricsProviders); + generateMetrics(cluster, res.getOutputStream()); } catch (Exception e) { long end = System.currentTimeMillis(); long time = end - start; @@ -90,7 +79,7 @@ public class PrometheusMetricsServlet extends HttpServlet { } else { log.error("Failed to generate prometheus stats, {} ms elapsed", time, e); } - res.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500); + res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500); } finally { long end = System.currentTimeMillis(); long time = end - start; @@ -106,6 +95,10 @@ public class PrometheusMetricsServlet extends HttpServlet { })); } + protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException { + PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, metricsProviders); + } + @Override public void destroy() { if (executor != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java similarity index 100% copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java copy to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java similarity index 69% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java rename to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java index d2067766a41..3723fb4ff5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java @@ -17,17 +17,3 @@ * under the License. */ package org.apache.pulsar.broker.stats.prometheus; - -import org.apache.pulsar.common.util.SimpleTextOutputStream; - -/** - * The prometheus metrics provider for generate prometheus format metrics. - */ -public interface PrometheusRawMetricsProvider { - - /** - * Generate the metrics from the metrics provider. - * @param stream the stream that write the metrics to - */ - void generate(SimpleTextOutputStream stream); -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e7eb8a09986..08fff1eea36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -106,8 +106,8 @@ import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TransactionBufferSnapshotService; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.MetricsGenerator; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; +import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet; import org.apache.pulsar.broker.storage.ManagedLedgerStorage; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; @@ -245,7 +245,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { // packages management service private Optional<PackagesManagement> packagesManagement = Optional.empty(); - private PrometheusMetricsServlet metricsServlet; + private PulsarPrometheusMetricsServlet metricsServlet; private List<PrometheusRawMetricsProvider> pendingMetricsProviders; private MetadataStoreExtended localMetadataStore; @@ -414,7 +414,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { } } - metricsServlet = null; + resetMetricsServlet(); if (this.webSocketService != null) { this.webSocketService.close(); @@ -571,6 +571,10 @@ public class PulsarService implements AutoCloseable, ShutdownService { } } + private synchronized void resetMetricsServlet() { + metricsServlet = null; + } + private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) { ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor( new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown")); @@ -698,16 +702,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { this.brokerAdditionalServlets = AdditionalServlets.load(config); this.webService = new WebService(this); - this.metricsServlet = new PrometheusMetricsServlet( - this, config.isExposeTopicLevelMetricsInPrometheus(), - config.isExposeConsumerLevelMetricsInPrometheus(), - config.isExposeProducerLevelMetricsInPrometheus(), - config.isSplitTopicAndPartitionLabelInPrometheus()); - if (pendingMetricsProviders != null) { - pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider)); - this.pendingMetricsProviders = null; - } - + createMetricsServlet(); this.addWebServerHandlers(webService, metricsServlet, this.config); this.webService.start(); heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config); @@ -827,8 +822,20 @@ public class PulsarService implements AutoCloseable, ShutdownService { } } + private synchronized void createMetricsServlet() { + this.metricsServlet = new PulsarPrometheusMetricsServlet( + this, config.isExposeTopicLevelMetricsInPrometheus(), + config.isExposeConsumerLevelMetricsInPrometheus(), + config.isExposeProducerLevelMetricsInPrometheus(), + config.isSplitTopicAndPartitionLabelInPrometheus()); + if (pendingMetricsProviders != null) { + pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider)); + this.pendingMetricsProviders = null; + } + } + private void addWebServerHandlers(WebService webService, - PrometheusMetricsServlet metricsServlet, + PulsarPrometheusMetricsServlet metricsServlet, ServiceConfiguration config) throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException, DeploymentException { @@ -1525,7 +1532,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { return resourceUsageTransportManager; } - public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) { + public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) { if (metricsServlet == null) { if (pendingMetricsProviders == null) { pendingMetricsProviders = new LinkedList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 9d5e1c77c69..a585601d545 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -18,12 +18,12 @@ */ package org.apache.pulsar.broker.stats.prometheus; +import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.generateSystemMetrics; +import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr; import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.prometheus.client.Collector; -import io.prometheus.client.Collector.MetricFamilySamples; -import io.prometheus.client.Collector.MetricFamilySamples.Sample; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import io.prometheus.client.Gauge.Child; @@ -34,7 +34,6 @@ import java.io.StringWriter; import java.io.Writer; import java.util.Collection; import java.util.Collections; -import java.util.Enumeration; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -230,53 +229,4 @@ public class PrometheusMetricsGenerator { } } - private static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) { - Enumeration<MetricFamilySamples> metricFamilySamples = CollectorRegistry.defaultRegistry.metricFamilySamples(); - while (metricFamilySamples.hasMoreElements()) { - MetricFamilySamples metricFamily = metricFamilySamples.nextElement(); - - // Write type of metric - stream.write("# TYPE ").write(metricFamily.name).write(' ') - .write(getTypeStr(metricFamily.type)).write('\n'); - - for (int i = 0; i < metricFamily.samples.size(); i++) { - Sample sample = metricFamily.samples.get(i); - stream.write(sample.name); - stream.write("{cluster=\"").write(cluster).write('"'); - for (int j = 0; j < sample.labelNames.size(); j++) { - String labelValue = sample.labelValues.get(j); - if (labelValue != null) { - labelValue = labelValue.replace("\"", "\\\""); - } - - stream.write(","); - stream.write(sample.labelNames.get(j)); - stream.write("=\""); - stream.write(labelValue); - stream.write('"'); - } - - stream.write("} "); - stream.write(Collector.doubleToGoString(sample.value)); - stream.write('\n'); - } - } - } - - static String getTypeStr(Collector.Type type) { - switch (type) { - case COUNTER: - return "counter"; - case GAUGE: - return "gauge"; - case SUMMARY : - return "summary"; - case HISTOGRAM: - return "histogram"; - case UNTYPED: - default: - return "untyped"; - } - } - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java new file mode 100644 index 00000000000..3d64f22d630 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import java.io.IOException; +import javax.servlet.ServletOutputStream; +import org.apache.pulsar.broker.PulsarService; + +public class PulsarPrometheusMetricsServlet extends PrometheusMetricsServlet { + + private static final long serialVersionUID = 1L; + + private final PulsarService pulsar; + private final boolean shouldExportTopicMetrics; + private final boolean shouldExportConsumerMetrics; + private final boolean shouldExportProducerMetrics; + private final boolean splitTopicAndPartitionLabel; + + public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, + boolean includeConsumerMetrics, boolean shouldExportProducerMetrics, + boolean splitTopicAndPartitionLabel) { + super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), pulsar.getConfiguration().getClusterName()); + this.pulsar = pulsar; + this.shouldExportTopicMetrics = includeTopicMetrics; + this.shouldExportConsumerMetrics = includeConsumerMetrics; + this.shouldExportProducerMetrics = shouldExportProducerMetrics; + this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel; + } + + @Override + protected void generateMetrics(String cluster, ServletOutputStream outputStream) throws IOException { + PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, + shouldExportProducerMetrics, splitTopicAndPartitionLabel, outputStream, + metricsProviders); + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 8bb1ff752f1..6a830657423 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -54,6 +55,8 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; @@ -134,6 +137,9 @@ public class ProxyService implements Closeable { @Getter private AdditionalServlets proxyAdditionalServlets; + private PrometheusMetricsServlet metricsServlet; + private List<PrometheusRawMetricsProvider> pendingMetricsProviders; + public ProxyService(ProxyConfiguration proxyConfig, AuthenticationService authenticationService) throws Exception { requireNonNull(proxyConfig); @@ -250,6 +256,8 @@ public class ProxyService implements Closeable { this.serviceUrlTls = null; } + createMetricsServlet(); + // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, // so that the protocol handlers can access broker service properly. @@ -259,6 +267,14 @@ public class ProxyService implements Closeable { startProxyExtensions(protocolHandlerChannelInitializers, bootstrap); } + private synchronized void createMetricsServlet() { + this.metricsServlet = new PrometheusMetricsServlet(-1L, proxyConfig.getClusterName()); + if (pendingMetricsProviders != null) { + pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider)); + this.pendingMetricsProviders = null; + } + } + // This call is used for starting additional protocol handlers public void startProxyExtensions(Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) { @@ -335,6 +351,8 @@ public class ProxyService implements Closeable { proxyAdditionalServlets = null; } + resetMetricsServlet(); + if (localMetadataStore != null) { try { localMetadataStore.close(); @@ -356,6 +374,10 @@ public class ProxyService implements Closeable { } } + private synchronized void resetMetricsServlet() { + metricsServlet = null; + } + public String getServiceUrl() { return serviceUrl; } @@ -414,5 +436,20 @@ public class ProxyService implements Closeable { return this.proxyClientAuthentication; } + public synchronized PrometheusMetricsServlet getMetricsServlet() { + return metricsServlet; + } + + public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) { + if (metricsServlet == null) { + if (pendingMetricsProviders == null) { + pendingMetricsProviders = new LinkedList<>(); + } + pendingMetricsProviders.add(metricsProvider); + } else { + this.metricsServlet.addRawMetricsProvider(metricsProvider); + } + } + private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 32b6fa13314..4df36229b01 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -32,7 +32,6 @@ import io.netty.util.internal.PlatformDependent; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import io.prometheus.client.Gauge.Child; -import io.prometheus.client.exporter.MetricsServlet; import io.prometheus.client.hotspot.DefaultExports; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -44,6 +43,7 @@ import org.apache.logging.log4j.core.util.datetime.FixedDateFormat; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; @@ -240,8 +240,13 @@ public class ProxyServiceStarter { ProxyConfiguration config, ProxyService service, BrokerDiscoveryProvider discoveryProvider) throws Exception { - server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), - Collections.emptyList(), config.isAuthenticateMetricsEndpoint()); + if (service != null) { + PrometheusMetricsServlet metricsServlet = service.getMetricsServlet(); + if (metricsServlet != null) { + server.addServlet("/metrics", new ServletHolder(metricsServlet), + Collections.emptyList(), config.isAuthenticateMetricsEndpoint()); + } + } server.addRestResources("/", VipStatus.class.getPackage().getName(), VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath()); server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(), diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java new file mode 100644 index 00000000000..63ac43d3210 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.mockito.Mockito.doReturn; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import com.google.common.base.MoreObjects; +import com.google.common.base.Splitter; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import io.prometheus.client.Counter; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.logging.LoggingFeature; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest { + + public static final String TEST_CLUSTER = "test-cluster"; + private ProxyService proxyService; + private WebServer proxyWebServer; + private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + + @Override + @BeforeClass + protected void setup() throws Exception { + internalSetup(); + + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); + proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setClusterName(TEST_CLUSTER); + + proxyService = Mockito.spy(new ProxyService(proxyConfig, + new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + + proxyService.start(); + + proxyService.addPrometheusRawMetricsProvider(stream -> stream.write("test_metrics{label1=\"xyz\"} 10 \n")); + + AuthenticationService authService = new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)); + + proxyWebServer = new WebServer(proxyConfig, authService); + ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); + proxyWebServer.start(); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + internalCleanup(); + proxyWebServer.stop(); + proxyService.close(); + } + + /** + * Validates proxy Prometheus endpoint. + */ + @Test + public void testMetrics() { + Counter.build("test_counter", "a test counter").create().register(); + + Client httpClient = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); + Response r = httpClient.target(proxyWebServer.getServiceUri()).path("/metrics").request() + .get(); + Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode()); + String response = r.readEntity(String.class).trim(); + + Multimap<String, Metric> metrics = parseMetrics(response); + + // Check that ProxyService metrics are present + List<Metric> cm = (List<Metric>) metrics.get("pulsar_proxy_binary_bytes"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), TEST_CLUSTER); + + // Check that any Prometheus metric registered in the default CollectorRegistry is present + List<Metric> cm2 = (List<Metric>) metrics.get("test_metrics"); + assertEquals(cm2.size(), 1); + assertEquals(cm2.get(0).tags.get("label1"), "xyz"); + + // Check that PrometheusRawMetricsProvider metrics are present + List<Metric> cm3 = (List<Metric>) metrics.get("test_counter"); + assertEquals(cm3.size(), 1); + assertEquals(cm3.get(0).tags.get("cluster"), TEST_CLUSTER); + } + + /** + * Hacky parsing of Prometheus text format. Should be good enough for unit tests + */ + public static Multimap<String, Metric> parseMetrics(String metrics) { + Multimap<String, Metric> parsed = ArrayListMultimap.create(); + + // Example of lines are + // jvm_threads_current{cluster="standalone",} 203.0 + // or + // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1", + // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897 + Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$"); + Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?"); + + Splitter.on("\n").split(metrics).forEach(line -> { + if (line.isEmpty() || line.startsWith("#")) { + return; + } + + Matcher matcher = pattern.matcher(line); + assertTrue(matcher.matches(), "line " + line + " does not match pattern " + pattern); + String name = matcher.group(1); + + Metric m = new Metric(); + String numericValue = matcher.group(3); + if (numericValue.equalsIgnoreCase("-Inf")) { + m.value = Double.NEGATIVE_INFINITY; + } else if (numericValue.equalsIgnoreCase("+Inf")) { + m.value = Double.POSITIVE_INFINITY; + } else { + m.value = Double.parseDouble(numericValue); + } + String tags = matcher.group(2); + Matcher tagsMatcher = tagsPattern.matcher(tags); + while (tagsMatcher.find()) { + String tag = tagsMatcher.group(1); + String value = tagsMatcher.group(2); + m.tags.put(tag, value); + } + + parsed.put(name, m); + }); + + return parsed; + } + + static class Metric { + Map<String, String> tags = new TreeMap<>(); + double value; + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("tags", tags).add("value", value).toString(); + } + } + +}
