This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 751b521edbc02544192e6b009abab0708c8bb0c7 Author: lipenghui <[email protected]> AuthorDate: Wed Dec 23 08:53:01 2020 +0800 Add raw prometheus metrics provider. (#9021) Pulsar support such plugins as protocol handlers and broker interceptors. This PR is added a RawPrometheusMetrcsProvider which can provide the ability to plugins to add metrics to the broker /metrics endpoint. (cherry picked from commit b9493fe0aa42ac0eac1aadd20f169c9107acecc3) --- .../org/apache/pulsar/broker/PulsarService.java | 25 +++++++++++++++- .../stats/prometheus/NamespaceStatsAggregator.java | 1 - .../prometheus/PrometheusMetricsGenerator.java | 11 +++++++ .../stats/prometheus/PrometheusMetricsServlet.java | 13 ++++++++- .../prometheus/PrometheusRawMetricsProvider.java | 33 +++++++++++++++++++++ .../pulsar/broker/service/BrokerServiceTest.java | 34 ++++++++++++++++++++++ 6 files changed, 114 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 61c1cea..785ff43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -38,6 +38,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -95,6 +96,7 @@ import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.MetricsGenerator; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.validator.MultipleListenerValidator; @@ -213,6 +215,9 @@ public class PulsarService implements AutoCloseable { private BrokerInterceptor brokerInterceptor; + private PrometheusMetricsServlet metricsServlet; + private List<PrometheusRawMetricsProvider> pendingMetricsProviders; + public enum State { Init, Started, Closed } @@ -499,9 +504,16 @@ public class PulsarService implements AutoCloseable { this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap); this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap); this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap); + this.metricsServlet = new PrometheusMetricsServlet( + this, config.isExposeTopicLevelMetricsInPrometheus(), + config.isExposeConsumerLevelMetricsInPrometheus()); + if (pendingMetricsProviders != null) { + pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider)); + this.pendingMetricsProviders = null; + } this.webService.addServlet("/metrics", - new ServletHolder(new PrometheusMetricsServlet(this, config.isExposeTopicLevelMetricsInPrometheus(), config.isExposeConsumerLevelMetricsInPrometheus())), + new ServletHolder(metricsServlet), false, attributeMap); if (config.isWebSocketServiceEnabled()) { @@ -1207,6 +1219,17 @@ public class PulsarService implements AutoCloseable { return topicPoliciesService; } + public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) { + if (metricsServlet == null) { + if (pendingMetricsProviders == null) { + pendingMetricsProviders = new LinkedList<>(); + } + pendingMetricsProviders.add(metricsProvider); + } else { + this.metricsServlet.addRawMetricsProvider(metricsProvider); + } + } + private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws InterruptedException, IOException, KeeperException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 5766091..ab2e670 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -56,7 +56,6 @@ public class NamespaceStatsAggregator { printDefaultBrokerStats(stream, cluster); LongAdder topicsCount = new LongAdder(); - pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> { namespaceStats.reset(); topicsCount.reset(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index b7fae4b..bd7ede4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.io.StringWriter; @@ -86,6 +87,11 @@ public class PrometheusMetricsGenerator { } public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, OutputStream out) throws IOException { + generate(pulsar, includeTopicMetrics, includeConsumerMetrics, out, null); + } + + public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, + OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders) throws IOException { ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); try { SimpleTextOutputStream stream = new SimpleTextOutputStream(buf); @@ -101,6 +107,11 @@ public class PrometheusMetricsGenerator { generateManagedLedgerBookieClientMetrics(pulsar, stream); + if (metricsProviders != null) { + for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) { + metricsProvider.generate(stream); + } + } out.write(buf.array(), buf.arrayOffset(), buf.readableBytes()); } finally { buf.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java index 12058c1..7fbfb75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java @@ -21,6 +21,8 @@ package org.apache.pulsar.broker.stats.prometheus; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import java.io.IOException; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -44,6 +46,7 @@ public class PrometheusMetricsServlet extends HttpServlet { private final PulsarService pulsar; private final boolean shouldExportTopicMetrics; private final boolean shouldExportConsumerMetrics; + private List<PrometheusRawMetricsProvider> metricsProviders; private ExecutorService executor = null; @@ -67,7 +70,8 @@ public class PrometheusMetricsServlet extends HttpServlet { try { res.setStatus(HttpStatus.OK_200); res.setContentType("text/plain"); - PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, res.getOutputStream()); + PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, + res.getOutputStream(), metricsProviders); context.complete(); } catch (IOException e) { @@ -85,5 +89,12 @@ public class PrometheusMetricsServlet extends HttpServlet { } } + public void addRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) { + if (metricsProviders == null) { + metricsProviders = new LinkedList<>(); + } + metricsProviders.add(metricsProvider); + } + private static final Logger log = LoggerFactory.getLogger(PrometheusMetricsServlet.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java new file mode 100644 index 0000000..d206776 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import org.apache.pulsar.common.util.SimpleTextOutputStream; + +/** + * The prometheus metrics provider for generate prometheus format metrics. + */ +public interface PrometheusRawMetricsProvider { + + /** + * Generate the metrics from the metrics provider. + * @param stream the stream that write the metrics to + */ + void generate(SimpleTextOutputStream stream); +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 40fa9f9..97ddcca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -29,7 +29,10 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; @@ -51,8 +54,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; @@ -68,6 +77,8 @@ import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.util.SimpleTextOutputStream; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -962,4 +973,27 @@ public class BrokerServiceTest extends BrokerTestBase { } assertNull(ledgers.get(topicMlName)); } + + @Test + public void testMetricsProvider() throws IOException { + PrometheusRawMetricsProvider rawMetricsProvider = new PrometheusRawMetricsProvider() { + @Override + public void generate(SimpleTextOutputStream stream) { + stream.write("test_metrics{label1=\"xyz\"} 10 \n"); + } + }; + getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider); + HttpClient httpClient = HttpClientBuilder.create().build(); + final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics"; + HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint)); + InputStream inputStream = response.getEntity().getContent(); + InputStreamReader isReader = new InputStreamReader(inputStream); + BufferedReader reader = new BufferedReader(isReader); + StringBuffer sb = new StringBuffer(); + String str; + while((str = reader.readLine()) != null){ + sb.append(str); + } + Assert.assertTrue(sb.toString().contains("test_metrics")); + } }
