This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b2f54ba385b49779151baff0874dce41faaa3ca4
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Mar 16 11:58:20 2022 +0100

    Add support of PrometheusRawMetricsProvider for the Pulsar-Proxy (#14681)
    
    (cherry picked from commit ea95b28336acee8fe42a5dd8d95c92500dad08ea)
---
 .../PrometheusMetricsGeneratorUtils.java           | 106 ++++++++++++
 .../stats/prometheus/PrometheusMetricsServlet.java |  41 ++---
 .../prometheus/PrometheusRawMetricsProvider.java   |   0
 .../broker/stats/prometheus/package-info.java      |  14 --
 .../org/apache/pulsar/broker/PulsarService.java    |  37 +++--
 .../prometheus/PrometheusMetricsGenerator.java     |  54 +-----
 .../prometheus/PulsarPrometheusMetricsServlet.java |  52 ++++++
 .../apache/pulsar/proxy/server/ProxyService.java   |  37 +++++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  11 +-
 .../proxy/server/ProxyPrometheusMetricsTest.java   | 181 +++++++++++++++++++++
 10 files changed, 425 insertions(+), 108 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
new file mode 100644
index 00000000000..0e298151971
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Enumeration;
+import java.util.List;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Generate metrics in a text format suitable to be consumed by Prometheus.
+ * Format specification can be found at {@link 
https://prometheus.io/docs/instrumenting/exposition_formats/}
+ */
+public class PrometheusMetricsGeneratorUtils {
+
+    public static void generate(String cluster, OutputStream out,
+                                List<PrometheusRawMetricsProvider> 
metricsProviders)
+            throws IOException {
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        try {
+            SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
+            generateSystemMetrics(stream, cluster);
+            if (metricsProviders != null) {
+                for (PrometheusRawMetricsProvider metricsProvider : 
metricsProviders) {
+                    metricsProvider.generate(stream);
+                }
+            }
+            out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
+        } finally {
+            buf.release();
+        }
+    }
+
+    public static void generateSystemMetrics(SimpleTextOutputStream stream, 
String cluster) {
+        Enumeration<Collector.MetricFamilySamples> metricFamilySamples =
+                CollectorRegistry.defaultRegistry.metricFamilySamples();
+        while (metricFamilySamples.hasMoreElements()) {
+            Collector.MetricFamilySamples metricFamily = 
metricFamilySamples.nextElement();
+
+            // Write type of metric
+            stream.write("# TYPE ").write(metricFamily.name).write(' ')
+                    .write(getTypeStr(metricFamily.type)).write('\n');
+
+            for (int i = 0; i < metricFamily.samples.size(); i++) {
+                Collector.MetricFamilySamples.Sample sample = 
metricFamily.samples.get(i);
+                stream.write(sample.name);
+                stream.write("{cluster=\"").write(cluster).write('"');
+                for (int j = 0; j < sample.labelNames.size(); j++) {
+                    String labelValue = sample.labelValues.get(j);
+                    if (labelValue != null) {
+                        labelValue = labelValue.replace("\"", "\\\"");
+                    }
+
+                    stream.write(",");
+                    stream.write(sample.labelNames.get(j));
+                    stream.write("=\"");
+                    stream.write(labelValue);
+                    stream.write('"');
+                }
+
+                stream.write("} ");
+                stream.write(Collector.doubleToGoString(sample.value));
+                stream.write('\n');
+            }
+        }
+    }
+
+    static String getTypeStr(Collector.Type type) {
+        switch (type) {
+            case COUNTER:
+                return "counter";
+            case GAUGE:
+                return "gauge";
+            case SUMMARY        :
+                return "summary";
+            case HISTOGRAM:
+                return "histogram";
+            case UNTYPED:
+            default:
+                return "untyped";
+        }
+    }
+
+}
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
similarity index 71%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
rename to 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 722c96c28a7..1c3277d5ee4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.EOFException;
 import java.io.IOException;
@@ -28,36 +28,28 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import javax.servlet.AsyncContext;
 import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import org.apache.pulsar.broker.PulsarService;
-import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PrometheusMetricsServlet extends HttpServlet {
 
     private static final long serialVersionUID = 1L;
+    private static final int HTTP_STATUS_OK_200 = 200;
+    private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
 
-    private final PulsarService pulsar;
-    private final boolean shouldExportTopicMetrics;
-    private final boolean shouldExportConsumerMetrics;
-    private final boolean shouldExportProducerMetrics;
     private final long metricsServletTimeoutMs;
-    private final boolean splitTopicAndPartitionLabel;
-    private List<PrometheusRawMetricsProvider> metricsProviders;
+    private final String cluster;
+    protected List<PrometheusRawMetricsProvider> metricsProviders;
 
     private ExecutorService executor = null;
 
-    public PrometheusMetricsServlet(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
-                                    boolean shouldExportProducerMetrics, 
boolean splitTopicAndPartitionLabel) {
-        this.pulsar = pulsar;
-        this.shouldExportTopicMetrics = includeTopicMetrics;
-        this.shouldExportConsumerMetrics = includeConsumerMetrics;
-        this.shouldExportProducerMetrics = shouldExportProducerMetrics;
-        this.metricsServletTimeoutMs = 
pulsar.getConfiguration().getMetricsServletTimeoutMs();
-        this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
+    public PrometheusMetricsServlet(long metricsServletTimeoutMs, String 
cluster) {
+        this.metricsServletTimeoutMs = metricsServletTimeoutMs;
+        this.cluster = cluster;
     }
 
     @Override
@@ -66,19 +58,16 @@ public class PrometheusMetricsServlet extends HttpServlet {
     }
 
     @Override
-    protected void doGet(HttpServletRequest request, HttpServletResponse 
response)
-            throws ServletException, IOException {
+    protected void doGet(HttpServletRequest request, HttpServletResponse 
response) {
         AsyncContext context = request.startAsync();
         context.setTimeout(metricsServletTimeoutMs);
         executor.execute(safeRun(() -> {
             long start = System.currentTimeMillis();
             HttpServletResponse res = (HttpServletResponse) 
context.getResponse();
             try {
-                res.setStatus(HttpStatus.OK_200);
+                res.setStatus(HTTP_STATUS_OK_200);
                 res.setContentType("text/plain");
-                PrometheusMetricsGenerator.generate(pulsar, 
shouldExportTopicMetrics, shouldExportConsumerMetrics,
-                        shouldExportProducerMetrics, 
splitTopicAndPartitionLabel, res.getOutputStream(),
-                        metricsProviders);
+                generateMetrics(cluster, res.getOutputStream());
             } catch (Exception e) {
                 long end = System.currentTimeMillis();
                 long time = end - start;
@@ -90,7 +79,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
                 } else {
                     log.error("Failed to generate prometheus stats, {} ms 
elapsed", time, e);
                 }
-                res.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
+                res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
             } finally {
                 long end = System.currentTimeMillis();
                 long time = end - start;
@@ -106,6 +95,10 @@ public class PrometheusMetricsServlet extends HttpServlet {
         }));
     }
 
+    protected void generateMetrics(String cluster, ServletOutputStream 
outputStream) throws IOException {
+        PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, 
metricsProviders);
+    }
+
     @Override
     public void destroy() {
         if (executor != null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
similarity index 100%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
copy to 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java
similarity index 69%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
rename to 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java
index d2067766a41..3723fb4ff5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusRawMetricsProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/package-info.java
@@ -17,17 +17,3 @@
  * under the License.
  */
 package org.apache.pulsar.broker.stats.prometheus;
-
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-
-/**
- * The prometheus metrics provider for generate prometheus format metrics.
- */
-public interface PrometheusRawMetricsProvider {
-
-    /**
-     * Generate the metrics from the metrics provider.
-     * @param stream the stream that write the metrics to
-     */
-    void generate(SimpleTextOutputStream stream);
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e7eb8a09986..08fff1eea36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -106,8 +106,8 @@ import 
org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import 
org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
@@ -245,7 +245,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
     // packages management service
     private Optional<PackagesManagement> packagesManagement = Optional.empty();
-    private PrometheusMetricsServlet metricsServlet;
+    private PulsarPrometheusMetricsServlet metricsServlet;
     private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
 
     private MetadataStoreExtended localMetadataStore;
@@ -414,7 +414,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 }
             }
 
-            metricsServlet = null;
+            resetMetricsServlet();
 
             if (this.webSocketService != null) {
                 this.webSocketService.close();
@@ -571,6 +571,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         }
     }
 
+    private synchronized void resetMetricsServlet() {
+        metricsServlet = null;
+    }
+
     private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> 
future) {
         ScheduledExecutorService shutdownExecutor = 
Executors.newSingleThreadScheduledExecutor(
                 new DefaultThreadFactory(getClass().getSimpleName() + 
"-shutdown"));
@@ -698,16 +702,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             this.brokerAdditionalServlets = AdditionalServlets.load(config);
 
             this.webService = new WebService(this);
-            this.metricsServlet = new PrometheusMetricsServlet(
-                    this, config.isExposeTopicLevelMetricsInPrometheus(),
-                    config.isExposeConsumerLevelMetricsInPrometheus(),
-                    config.isExposeProducerLevelMetricsInPrometheus(),
-                    config.isSplitTopicAndPartitionLabelInPrometheus());
-            if (pendingMetricsProviders != null) {
-                pendingMetricsProviders.forEach(provider -> 
metricsServlet.addRawMetricsProvider(provider));
-                this.pendingMetricsProviders = null;
-            }
-
+            createMetricsServlet();
             this.addWebServerHandlers(webService, metricsServlet, this.config);
             this.webService.start();
             heartbeatNamespaceV1 = 
NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
@@ -827,8 +822,20 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         }
     }
 
+    private synchronized void createMetricsServlet() {
+        this.metricsServlet = new PulsarPrometheusMetricsServlet(
+                this, config.isExposeTopicLevelMetricsInPrometheus(),
+                config.isExposeConsumerLevelMetricsInPrometheus(),
+                config.isExposeProducerLevelMetricsInPrometheus(),
+                config.isSplitTopicAndPartitionLabelInPrometheus());
+        if (pendingMetricsProviders != null) {
+            pendingMetricsProviders.forEach(provider -> 
metricsServlet.addRawMetricsProvider(provider));
+            this.pendingMetricsProviders = null;
+        }
+    }
+
     private void addWebServerHandlers(WebService webService,
-                                      PrometheusMetricsServlet metricsServlet,
+                                      PulsarPrometheusMetricsServlet 
metricsServlet,
                                       ServiceConfiguration config)
             throws PulsarServerException, PulsarClientException, 
MalformedURLException, ServletException,
             DeploymentException {
@@ -1525,7 +1532,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return resourceUsageTransportManager;
     }
 
-    public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider 
metricsProvider) {
+    public synchronized void 
addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
         if (metricsServlet == null) {
             if (pendingMetricsProviders == null) {
                 pendingMetricsProviders = new LinkedList<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 9d5e1c77c69..a585601d545 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.generateSystemMetrics;
+import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.prometheus.client.Collector;
-import io.prometheus.client.Collector.MetricFamilySamples;
-import io.prometheus.client.Collector.MetricFamilySamples.Sample;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
@@ -34,7 +34,6 @@ import java.io.StringWriter;
 import java.io.Writer;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -230,53 +229,4 @@ public class PrometheusMetricsGenerator {
         }
     }
 
-    private static void generateSystemMetrics(SimpleTextOutputStream stream, 
String cluster) {
-        Enumeration<MetricFamilySamples> metricFamilySamples = 
CollectorRegistry.defaultRegistry.metricFamilySamples();
-        while (metricFamilySamples.hasMoreElements()) {
-            MetricFamilySamples metricFamily = 
metricFamilySamples.nextElement();
-
-            // Write type of metric
-            stream.write("# TYPE ").write(metricFamily.name).write(' ')
-                    .write(getTypeStr(metricFamily.type)).write('\n');
-
-            for (int i = 0; i < metricFamily.samples.size(); i++) {
-                Sample sample = metricFamily.samples.get(i);
-                stream.write(sample.name);
-                stream.write("{cluster=\"").write(cluster).write('"');
-                for (int j = 0; j < sample.labelNames.size(); j++) {
-                    String labelValue = sample.labelValues.get(j);
-                    if (labelValue != null) {
-                        labelValue = labelValue.replace("\"", "\\\"");
-                    }
-
-                    stream.write(",");
-                    stream.write(sample.labelNames.get(j));
-                    stream.write("=\"");
-                    stream.write(labelValue);
-                    stream.write('"');
-                }
-
-                stream.write("} ");
-                stream.write(Collector.doubleToGoString(sample.value));
-                stream.write('\n');
-            }
-        }
-    }
-
-    static String getTypeStr(Collector.Type type) {
-        switch (type) {
-        case COUNTER:
-            return "counter";
-        case GAUGE:
-            return "gauge";
-        case SUMMARY        :
-            return "summary";
-        case HISTOGRAM:
-            return "histogram";
-        case UNTYPED:
-        default:
-            return "untyped";
-        }
-    }
-
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
new file mode 100644
index 00000000000..3d64f22d630
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import java.io.IOException;
+import javax.servlet.ServletOutputStream;
+import org.apache.pulsar.broker.PulsarService;
+
+public class PulsarPrometheusMetricsServlet extends PrometheusMetricsServlet {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarService pulsar;
+    private final boolean shouldExportTopicMetrics;
+    private final boolean shouldExportConsumerMetrics;
+    private final boolean shouldExportProducerMetrics;
+    private final boolean splitTopicAndPartitionLabel;
+
+    public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean 
includeTopicMetrics,
+                                          boolean includeConsumerMetrics, 
boolean shouldExportProducerMetrics,
+                                          boolean splitTopicAndPartitionLabel) 
{
+        super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), 
pulsar.getConfiguration().getClusterName());
+        this.pulsar = pulsar;
+        this.shouldExportTopicMetrics = includeTopicMetrics;
+        this.shouldExportConsumerMetrics = includeConsumerMetrics;
+        this.shouldExportProducerMetrics = shouldExportProducerMetrics;
+        this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
+    }
+
+    @Override
+    protected void generateMetrics(String cluster, ServletOutputStream 
outputStream) throws IOException {
+        PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, 
shouldExportConsumerMetrics,
+                shouldExportProducerMetrics, splitTopicAndPartitionLabel, 
outputStream,
+                metricsProviders);
+    }
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 8bb1ff752f1..6a830657423 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -54,6 +55,8 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -134,6 +137,9 @@ public class ProxyService implements Closeable {
     @Getter
     private AdditionalServlets proxyAdditionalServlets;
 
+    private PrometheusMetricsServlet metricsServlet;
+    private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
+
     public ProxyService(ProxyConfiguration proxyConfig,
                         AuthenticationService authenticationService) throws 
Exception {
         requireNonNull(proxyConfig);
@@ -250,6 +256,8 @@ public class ProxyService implements Closeable {
             this.serviceUrlTls = null;
         }
 
+        createMetricsServlet();
+
         // Initialize the message protocol handlers.
         // start the protocol handlers only after the broker is ready,
         // so that the protocol handlers can access broker service properly.
@@ -259,6 +267,14 @@ public class ProxyService implements Closeable {
         startProxyExtensions(protocolHandlerChannelInitializers, bootstrap);
     }
 
+    private synchronized void createMetricsServlet() {
+        this.metricsServlet = new PrometheusMetricsServlet(-1L, 
proxyConfig.getClusterName());
+        if (pendingMetricsProviders != null) {
+            pendingMetricsProviders.forEach(provider -> 
metricsServlet.addRawMetricsProvider(provider));
+            this.pendingMetricsProviders = null;
+        }
+    }
+
     // This call is used for starting additional protocol handlers
     public void startProxyExtensions(Map<String, Map<InetSocketAddress,
             ChannelInitializer<SocketChannel>>> protocolHandlers, 
ServerBootstrap serverBootstrap) {
@@ -335,6 +351,8 @@ public class ProxyService implements Closeable {
             proxyAdditionalServlets = null;
         }
 
+        resetMetricsServlet();
+
         if (localMetadataStore != null) {
             try {
                 localMetadataStore.close();
@@ -356,6 +374,10 @@ public class ProxyService implements Closeable {
         }
     }
 
+    private synchronized void resetMetricsServlet() {
+        metricsServlet = null;
+    }
+
     public String getServiceUrl() {
         return serviceUrl;
     }
@@ -414,5 +436,20 @@ public class ProxyService implements Closeable {
         return this.proxyClientAuthentication;
     }
 
+    public synchronized PrometheusMetricsServlet getMetricsServlet() {
+        return metricsServlet;
+    }
+
+    public synchronized void 
addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
+        if (metricsServlet == null) {
+            if (pendingMetricsProviders == null) {
+                pendingMetricsProviders = new LinkedList<>();
+            }
+            pendingMetricsProviders.add(metricsProvider);
+        } else {
+            this.metricsServlet.addRawMetricsProvider(metricsProvider);
+        }
+    }
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ProxyService.class);
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 32b6fa13314..4df36229b01 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -32,7 +32,6 @@ import io.netty.util.internal.PlatformDependent;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
-import io.prometheus.client.exporter.MetricsServlet;
 import io.prometheus.client.hotspot.DefaultExports;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -44,6 +43,7 @@ import 
org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import 
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
@@ -240,8 +240,13 @@ public class ProxyServiceStarter {
                                      ProxyConfiguration config,
                                      ProxyService service,
                                      BrokerDiscoveryProvider 
discoveryProvider) throws Exception {
-        server.addServlet("/metrics", new ServletHolder(MetricsServlet.class),
-                Collections.emptyList(), 
config.isAuthenticateMetricsEndpoint());
+        if (service != null) {
+            PrometheusMetricsServlet metricsServlet = 
service.getMetricsServlet();
+            if (metricsServlet != null) {
+                server.addServlet("/metrics", new 
ServletHolder(metricsServlet),
+                        Collections.emptyList(), 
config.isAuthenticateMetricsEndpoint());
+            }
+        }
         server.addRestResources("/", VipStatus.class.getPackage().getName(),
                 VipStatus.ATTRIBUTE_STATUS_FILE_PATH, 
config.getStatusFilePath());
         server.addRestResources("/proxy-stats", 
ProxyStats.class.getPackage().getName(),
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
new file mode 100644
index 00000000000..63ac43d3210
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import io.prometheus.client.Counter;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.logging.LoggingFeature;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest {
+
+    public static final String TEST_CLUSTER = "test-cluster";
+    private ProxyService proxyService;
+    private WebServer proxyWebServer;
+    private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
+        proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        proxyConfig.setClusterName(TEST_CLUSTER);
+
+        proxyService = Mockito.spy(new ProxyService(proxyConfig,
+                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig))));
+        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
+        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
+
+        proxyService.start();
+
+        proxyService.addPrometheusRawMetricsProvider(stream -> 
stream.write("test_metrics{label1=\"xyz\"} 10 \n"));
+
+        AuthenticationService authService = new AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig));
+
+        proxyWebServer = new WebServer(proxyConfig, authService);
+        ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, 
proxyService, null);
+        proxyWebServer.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+        proxyWebServer.stop();
+        proxyService.close();
+    }
+
+    /**
+     * Validates proxy Prometheus endpoint.
+     */
+    @Test
+    public void testMetrics() {
+        Counter.build("test_counter", "a test counter").create().register();
+
+        Client httpClient = ClientBuilder.newClient(new 
ClientConfig().register(LoggingFeature.class));
+        Response r = 
httpClient.target(proxyWebServer.getServiceUri()).path("/metrics").request()
+                .get();
+        Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode());
+        String response = r.readEntity(String.class).trim();
+
+        Multimap<String, Metric> metrics = parseMetrics(response);
+
+        // Check that ProxyService metrics are present
+        List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_proxy_binary_bytes");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), TEST_CLUSTER);
+
+        // Check that any Prometheus metric registered in the default 
CollectorRegistry is present
+        List<Metric> cm2 = (List<Metric>) metrics.get("test_metrics");
+        assertEquals(cm2.size(), 1);
+        assertEquals(cm2.get(0).tags.get("label1"), "xyz");
+
+        // Check that PrometheusRawMetricsProvider metrics are present
+        List<Metric> cm3 = (List<Metric>) metrics.get("test_counter");
+        assertEquals(cm3.size(), 1);
+        assertEquals(cm3.get(0).tags.get("cluster"), TEST_CLUSTER);
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Should be good enough for unit 
tests
+     */
+    public static Multimap<String, Metric> parseMetrics(String metrics) {
+        Multimap<String, Metric> parsed = ArrayListMultimap.create();
+
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", 
namespace="sample/standalone/ns1",
+        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+        Pattern pattern = 
Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+
+        Splitter.on("\n").split(metrics).forEach(line -> {
+            if (line.isEmpty() || line.startsWith("#")) {
+                return;
+            }
+
+            Matcher matcher = pattern.matcher(line);
+            assertTrue(matcher.matches(), "line " + line + " does not match 
pattern " + pattern);
+            String name = matcher.group(1);
+
+            Metric m = new Metric();
+            String numericValue = matcher.group(3);
+            if (numericValue.equalsIgnoreCase("-Inf")) {
+                m.value = Double.NEGATIVE_INFINITY;
+            } else if (numericValue.equalsIgnoreCase("+Inf")) {
+                m.value = Double.POSITIVE_INFINITY;
+            } else {
+                m.value = Double.parseDouble(numericValue);
+            }
+            String tags = matcher.group(2);
+            Matcher tagsMatcher = tagsPattern.matcher(tags);
+            while (tagsMatcher.find()) {
+                String tag = tagsMatcher.group(1);
+                String value = tagsMatcher.group(2);
+                m.tags.put(tag, value);
+            }
+
+            parsed.put(name, m);
+        });
+
+        return parsed;
+    }
+
+    static class Metric {
+        Map<String, String> tags = new TreeMap<>();
+        double value;
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("tags", 
tags).add("value", value).toString();
+        }
+    }
+
+}

Reply via email to