This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8182edd7b50ef3ca5add36c5eadebeaf0c75c8e7
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Apr 15 09:55:24 2024 +0300

    [fix][broker] Optimize /metrics, fix unbounded request queue issue and fix 
race conditions in metricsBufferResponse mode (#22494)
    
    
    (cherry picked from commit 7009071b6d53bbc3d740ea99cdc0c010692679ab)
    
    # Conflicts:
    #       
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
    #       
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
    #       
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
    #       
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
    #       
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
    #       
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
    #       
pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
---
 conf/proxy.conf                                    |   6 +-
 .../PrometheusMetricsGeneratorUtils.java           |   2 +-
 .../stats/prometheus/PrometheusMetricsServlet.java | 149 +++++++---
 .../org/apache/pulsar/broker/stats/TimeWindow.java |  94 ------
 .../org/apache/pulsar/broker/stats/WindowWrap.java |  56 ----
 .../broker/stats/prometheus/MetricsExports.java    |  68 +++++
 .../stats/prometheus/PrometheusMetricStreams.java  |   2 +-
 .../prometheus/PrometheusMetricsGenerator.java     | 328 ++++++++++++---------
 .../prometheus/PulsarPrometheusMetricsServlet.java | 140 ++++++++-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  12 +-
 .../apache/pulsar/PrometheusMetricsTestUtil.java   |  84 ++++++
 .../persistent/BucketDelayedDeliveryTest.java      |   6 +-
 .../service/persistent/PersistentTopicTest.java    |   4 +-
 .../broker/service/schema/SchemaServiceTest.java   |   4 +-
 .../pulsar/broker/stats/ConsumerStatsTest.java     |   4 +-
 .../broker/stats/MetadataStoreStatsTest.java       |   6 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 114 +++----
 .../pulsar/broker/stats/SubscriptionStatsTest.java |   4 +-
 .../apache/pulsar/broker/stats/TimeWindowTest.java |  83 ------
 .../broker/stats/TransactionMetricsTest.java       |  18 +-
 .../buffer/TransactionBufferClientTest.java        |   4 +-
 .../pendingack/PendingAckPersistentTest.java       |   4 +-
 .../apache/pulsar/broker/web/WebServiceTest.java   |   4 +-
 .../pulsar/common/util/SimpleTextOutputStream.java |  16 +-
 .../pulsar/proxy/server/ProxyConfiguration.java    |   6 +
 .../apache/pulsar/proxy/server/ProxyService.java   |   3 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  40 ++-
 27 files changed, 736 insertions(+), 525 deletions(-)

diff --git a/conf/proxy.conf b/conf/proxy.conf
index 8285e1cb753..5a9d433f39c 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -376,5 +376,7 @@ zooKeeperCacheExpirySeconds=-1
 enableProxyStatsEndpoints=true
 # Whether the '/metrics' endpoint requires authentication. Defaults to true
 authenticateMetricsEndpoint=true
-# Enable cache metrics data, default value is false
-metricsBufferResponse=false
+# Time in milliseconds that metrics endpoint would time out. Default is 30s.
+# Set it to 0 to disable timeout.
+metricsServletTimeoutMs=30000
+
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
index 828d9871bb3..077d5280b51 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorUtils.java
@@ -76,7 +76,7 @@ public class PrometheusMetricsGeneratorUtils {
                 }
                 for (int j = 0; j < sample.labelNames.size(); j++) {
                     String labelValue = sample.labelValues.get(j);
-                    if (labelValue != null) {
+                    if (labelValue != null && labelValue.indexOf('"') > -1) {
                         labelValue = labelValue.replace("\"", "\\\"");
                     }
                     if (j > 0) {
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 64d1fcdab6f..8a41bed29d4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -25,9 +25,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.servlet.ServletException;
-import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -35,67 +39,132 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PrometheusMetricsServlet extends HttpServlet {
-
     private static final long serialVersionUID = 1L;
-    private static final int HTTP_STATUS_OK_200 = 200;
-    private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
-
-    private final long metricsServletTimeoutMs;
-    private final String cluster;
+    static final int HTTP_STATUS_OK_200 = 200;
+    static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
+    protected final long metricsServletTimeoutMs;
+    protected final String cluster;
     protected List<PrometheusRawMetricsProvider> metricsProviders;
 
-    private ExecutorService executor = null;
+    protected ExecutorService executor = null;
+    protected final int executorMaxThreads;
 
     public PrometheusMetricsServlet(long metricsServletTimeoutMs, String 
cluster) {
+        this(metricsServletTimeoutMs, cluster, 1);
+    }
+
+    public PrometheusMetricsServlet(long metricsServletTimeoutMs, String 
cluster, int executorMaxThreads) {
         this.metricsServletTimeoutMs = metricsServletTimeoutMs;
         this.cluster = cluster;
+        this.executorMaxThreads = executorMaxThreads;
     }
 
     @Override
     public void init() throws ServletException {
-        executor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("prometheus-stats"));
+        if (executorMaxThreads > 0) {
+            executor =
+                    Executors.newScheduledThreadPool(executorMaxThreads, new 
DefaultThreadFactory("prometheus-stats"));
+        }
     }
 
     @Override
     protected void doGet(HttpServletRequest request, HttpServletResponse 
response) {
         AsyncContext context = request.startAsync();
-        context.setTimeout(metricsServletTimeoutMs);
-        executor.execute(() -> {
-            long start = System.currentTimeMillis();
-            HttpServletResponse res = (HttpServletResponse) 
context.getResponse();
-            try {
-                res.setStatus(HTTP_STATUS_OK_200);
-                res.setContentType("text/plain;charset=utf-8");
-                generateMetrics(cluster, res.getOutputStream());
-            } catch (Exception e) {
-                long end = System.currentTimeMillis();
-                long time = end - start;
-                if (e instanceof EOFException) {
-                    // NO STACKTRACE
-                    log.error("Failed to send metrics, "
-                            + "likely the client or this server closed "
-                            + "the connection due to a timeout ({} ms 
elapsed): {}", time, e + "");
-                } else {
-                    log.error("Failed to generate prometheus stats, {} ms 
elapsed", time, e);
+        // set hard timeout to 2 * timeout
+        if (metricsServletTimeoutMs > 0) {
+            context.setTimeout(metricsServletTimeoutMs * 2);
+        }
+        long startNanos = System.nanoTime();
+        AtomicBoolean taskStarted = new AtomicBoolean(false);
+        Future<?> future = executor.submit(() -> {
+            taskStarted.set(true);
+            long elapsedNanos = System.nanoTime() - startNanos;
+            // check if the request has been timed out, implement a soft 
timeout
+            // so that response writing can continue to up to 2 * timeout
+            if (metricsServletTimeoutMs > 0 && elapsedNanos > 
TimeUnit.MILLISECONDS.toNanos(metricsServletTimeoutMs)) {
+                log.warn("Prometheus metrics request was too long in queue 
({}ms). Skipping sending metrics.",
+                        TimeUnit.NANOSECONDS.toMillis(elapsedNanos));
+                if (!response.isCommitted()) {
+                    response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
                 }
-                res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
-            } finally {
-                long end = System.currentTimeMillis();
-                long time = end - start;
-                try {
-                    context.complete();
-                } catch (IllegalStateException e) {
-                    // this happens when metricsServletTimeoutMs expires
-                    // java.lang.IllegalStateException: AsyncContext completed 
and/or Request lifecycle recycled
-                    log.error("Failed to generate prometheus stats, "
-                            + "this is likely due to metricsServletTimeoutMs: 
{} ms elapsed: {}", time, e + "");
+                context.complete();
+                return;
+            }
+            handleAsyncMetricsRequest(context);
+        });
+        context.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent asyncEvent) throws IOException {
+                if (!taskStarted.get()) {
+                    future.cancel(false);
                 }
             }
+
+            @Override
+            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
+                if (!taskStarted.get()) {
+                    future.cancel(false);
+                }
+                log.warn("Prometheus metrics request timed out");
+                HttpServletResponse res = (HttpServletResponse) 
context.getResponse();
+                if (!res.isCommitted()) {
+                    res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                }
+                context.complete();
+            }
+
+            @Override
+            public void onError(AsyncEvent asyncEvent) throws IOException {
+                if (!taskStarted.get()) {
+                    future.cancel(false);
+                }
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent asyncEvent) throws IOException 
{
+
+            }
         });
+
+    }
+
+    private void handleAsyncMetricsRequest(AsyncContext context) {
+        long start = System.currentTimeMillis();
+        HttpServletResponse res = (HttpServletResponse) context.getResponse();
+        try {
+            generateMetricsSynchronously(res);
+        } catch (Exception e) {
+            long end = System.currentTimeMillis();
+            long time = end - start;
+            if (e instanceof EOFException) {
+                // NO STACKTRACE
+                log.error("Failed to send metrics, "
+                        + "likely the client or this server closed "
+                        + "the connection due to a timeout ({} ms elapsed): 
{}", time, e + "");
+            } else {
+                log.error("Failed to generate prometheus stats, {} ms 
elapsed", time, e);
+            }
+            if (!res.isCommitted()) {
+                res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+            }
+        } finally {
+            long end = System.currentTimeMillis();
+            long time = end - start;
+            try {
+                context.complete();
+            } catch (IllegalStateException e) {
+                // this happens when metricsServletTimeoutMs expires
+                // java.lang.IllegalStateException: AsyncContext completed 
and/or Request lifecycle recycled
+                log.error("Failed to generate prometheus stats, "
+                        + "this is likely due to metricsServletTimeoutMs: {} 
ms elapsed: {}", time, e + "");
+            }
+        }
     }
 
-    protected void generateMetrics(String cluster, ServletOutputStream 
outputStream) throws IOException {
-        PrometheusMetricsGeneratorUtils.generate(cluster, outputStream, 
metricsProviders);
+    private void generateMetricsSynchronously(HttpServletResponse res) throws 
IOException {
+        res.setStatus(HTTP_STATUS_OK_200);
+        res.setContentType("text/plain;charset=utf-8");
+        PrometheusMetricsGeneratorUtils.generate(cluster, 
res.getOutputStream(), metricsProviders);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java
deleted file mode 100644
index 08730189322..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/TimeWindow.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats;
-
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import java.util.function.Function;
-
-public final class TimeWindow<T> {
-    private final int interval;
-    private final int sampleCount;
-    private final AtomicReferenceArray<WindowWrap<T>> array;
-
-    public TimeWindow(int sampleCount, int interval) {
-        this.sampleCount = sampleCount;
-        this.interval = interval;
-        this.array = new AtomicReferenceArray<>(sampleCount);
-    }
-
-    /**
-     * return current time window data.
-     *
-     * @param function generate data.
-     * @return
-     */
-    public synchronized WindowWrap<T> current(Function<T, T> function) {
-        long millis = System.currentTimeMillis();
-
-        if (millis < 0) {
-            return null;
-        }
-        int idx = calculateTimeIdx(millis);
-        long windowStart = calculateWindowStart(millis);
-        while (true) {
-            WindowWrap<T> old = array.get(idx);
-            if (old == null) {
-                WindowWrap<T> window = new WindowWrap<>(interval, windowStart, 
null);
-                if (array.compareAndSet(idx, null, window)) {
-                    T value = null == function ? null : function.apply(null);
-                    window.value(value);
-                    return window;
-                } else {
-                    Thread.yield();
-                }
-            } else if (windowStart == old.start()) {
-                return old;
-            } else if (windowStart > old.start()) {
-                T value = null == function ? null : 
function.apply(old.value());
-                old.value(value);
-                old.resetWindowStart(windowStart);
-                return old;
-            } else {
-                //it should never goes here
-                throw new IllegalStateException();
-            }
-        }
-    }
-
-    private int calculateTimeIdx(long timeMillis) {
-        long timeId = timeMillis / this.interval;
-        return (int) (timeId % sampleCount);
-    }
-
-    private long calculateWindowStart(long timeMillis) {
-        return timeMillis - timeMillis % this.interval;
-    }
-
-    public int sampleCount() {
-        return sampleCount;
-    }
-
-    public int interval() {
-        return interval;
-    }
-
-    public long currentWindowStart(long millis) {
-        return this.calculateWindowStart(millis);
-    }
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java
deleted file mode 100644
index 12869b82921..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/WindowWrap.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats;
-
-public final class WindowWrap<T> {
-    private final long interval;
-    private long start;
-    private T value;
-
-    public WindowWrap(long interval, long windowStart, T value) {
-        this.interval = interval;
-        this.start = windowStart;
-        this.value = value;
-    }
-
-    public long interval() {
-        return this.interval;
-    }
-
-    public long start() {
-        return this.start;
-    }
-
-    public T value() {
-        return value;
-    }
-
-    public void value(T value) {
-        this.value = value;
-    }
-
-    public WindowWrap<T> resetWindowStart(long startTime) {
-        this.start = startTime;
-        return this;
-    }
-
-    public boolean isTimeInWindow(long timeMillis) {
-        return start <= timeMillis && timeMillis < start + interval;
-    }
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java
new file mode 100644
index 00000000000..b80e5747d8a
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/MetricsExports.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.hotspot.DefaultExports;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
+
+public class MetricsExports {
+    private static boolean initialized = false;
+
+    private MetricsExports() {
+    }
+
+    public static synchronized void initialize() {
+        if (!initialized) {
+            DefaultExports.initialize();
+            register(CollectorRegistry.defaultRegistry);
+            initialized = true;
+        }
+    }
+
+    public static void register(CollectorRegistry registry) {
+        Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new 
Gauge.Child() {
+            @Override
+            public double get() {
+                return getJvmDirectMemoryUsed();
+            }
+        }).register(registry);
+
+        Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new 
Gauge.Child() {
+            @Override
+            public double get() {
+                return DirectMemoryUtils.jvmMaxDirectMemory();
+            }
+        }).register(registry);
+
+        // metric to export pulsar version info
+        Gauge.build("pulsar_version_info", "-")
+                .labelNames("version", "commit").create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return 1.0;
+                    }
+                }, PulsarVersion.getVersion(), PulsarVersion.getGitSha())
+                .register(registry);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
index 93cbad4e195..5a5a61404b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
@@ -42,7 +42,7 @@ public class PrometheusMetricStreams {
         stream.write(metricName).write('{');
         for (int i = 0; i < labelsAndValuesArray.length; i += 2) {
             String labelValue = labelsAndValuesArray[i + 1];
-            if (labelValue != null) {
+            if (labelValue != null && labelValue.indexOf('"') > -1) {
                 labelValue = labelValue.replace("\"", "\\\"");
             }
             
stream.write(labelsAndValuesArray[i]).write("=\"").write(labelValue).write('\"');
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 124f0d3e54e..bbd09335c0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -20,40 +20,39 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.generateSystemMetrics;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGeneratorUtils.getTypeStr;
-import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.prometheus.client.Collector;
-import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.Gauge;
-import io.prometheus.client.Gauge.Child;
-import io.prometheus.client.hotspot.DefaultExports;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.StringWriter;
+import java.io.OutputStreamWriter;
 import java.io.Writer;
-import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.stats.TimeWindow;
-import org.apache.pulsar.broker.stats.WindowWrap;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.common.util.DirectMemoryUtils;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.eclipse.jetty.server.HttpOutput;
 
 /**
  * Generate metrics aggregated at the namespace level and optionally at a 
topic level and formats them out
@@ -62,123 +61,80 @@ import org.eclipse.jetty.server.HttpOutput;
  * 
href="https://prometheus.io/docs/instrumenting/exposition_formats/";>Exposition 
Formats</a>
  */
 @Slf4j
-public class PrometheusMetricsGenerator {
-    private static volatile TimeWindow<ByteBuf> timeWindow;
-    private static final int MAX_COMPONENTS = 64;
-
-    static {
-        DefaultExports.initialize();
-
-        Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new 
Child() {
-            @Override
-            public double get() {
-                return getJvmDirectMemoryUsed();
-            }
-        }).register(CollectorRegistry.defaultRegistry);
-
-        Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new 
Child() {
-            @Override
-            public double get() {
-                return DirectMemoryUtils.jvmMaxDirectMemory();
-            }
-        }).register(CollectorRegistry.defaultRegistry);
-
-        // metric to export pulsar version info
-        Gauge.build("pulsar_version_info", "-")
-                .labelNames("version", "commit").create()
-                .setChild(new Child() {
-                    @Override
-                    public double get() {
-                        return 1.0;
-                    }
-                }, PulsarVersion.getVersion(), PulsarVersion.getGitSha())
-                .register(CollectorRegistry.defaultRegistry);
-    }
-
-    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
-                                boolean includeProducerMetrics, OutputStream 
out) throws IOException {
-        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, 
includeProducerMetrics, false, out, null);
-    }
-
-    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
-                                boolean includeProducerMetrics, boolean 
splitTopicAndPartitionIndexLabel,
-                                OutputStream out) throws IOException {
-        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, 
includeProducerMetrics,
-                splitTopicAndPartitionIndexLabel, out, null);
-    }
-
-    public static synchronized void generate(PulsarService pulsar, boolean 
includeTopicMetrics,
-                                             boolean includeConsumerMetrics, 
boolean includeProducerMetrics,
-                                             boolean 
splitTopicAndPartitionIndexLabel, OutputStream out,
-                                             
List<PrometheusRawMetricsProvider> metricsProviders) throws IOException {
-        ByteBuf buffer;
-        boolean exposeBufferMetrics = 
pulsar.getConfiguration().isMetricsBufferResponse();
+public class PrometheusMetricsGenerator implements AutoCloseable {
+    private static final int DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024; // 1MB
+    private static final int MINIMUM_FOR_MAX_COMPONENTS = 64;
+
+    private volatile MetricsBuffer metricsBuffer;
+    private static AtomicReferenceFieldUpdater<PrometheusMetricsGenerator, 
MetricsBuffer> metricsBufferFieldUpdater =
+            
AtomicReferenceFieldUpdater.newUpdater(PrometheusMetricsGenerator.class, 
MetricsBuffer.class,
+                    "metricsBuffer");
+    private volatile boolean closed;
+
+    public static class MetricsBuffer {
+        private final CompletableFuture<ByteBuf> bufferFuture;
+        private final long createTimeslot;
+        private final AtomicInteger refCnt = new AtomicInteger(2);
+
+        MetricsBuffer(long timeslot) {
+            bufferFuture = new CompletableFuture<>();
+            createTimeslot = timeslot;
+        }
 
-        if (!exposeBufferMetrics) {
-            buffer = generate0(pulsar, includeTopicMetrics, 
includeConsumerMetrics, includeProducerMetrics,
-                    splitTopicAndPartitionIndexLabel, metricsProviders);
-        } else {
-            if (null == timeWindow) {
-                int period = 
pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds();
-                timeWindow = new TimeWindow<>(1, (int) 
TimeUnit.SECONDS.toMillis(period));
-            }
-            WindowWrap<ByteBuf> window = timeWindow.current(oldBuf -> {
-                // release expired buffer, in case of memory leak
-                if (oldBuf != null && oldBuf.refCnt() > 0) {
-                    oldBuf.release();
-                    log.debug("Cached metrics buffer released");
-                }
+        public CompletableFuture<ByteBuf> getBufferFuture() {
+            return bufferFuture;
+        }
 
-                try {
-                    ByteBuf buf = generate0(pulsar, includeTopicMetrics, 
includeConsumerMetrics, includeProducerMetrics,
-                            splitTopicAndPartitionIndexLabel, 
metricsProviders);
-                    log.debug("Generated metrics buffer size {}", 
buf.readableBytes());
-                    return buf;
-                } catch (IOException e) {
-                    log.error("Generate metrics failed", e);
-                    //return empty buffer if exception happens
-                    return PulsarByteBufAllocator.DEFAULT.heapBuffer(0);
-                }
-            });
+        long getCreateTimeslot() {
+            return createTimeslot;
+        }
 
-            if (null == window || null == window.value()) {
-                return;
-            }
-            buffer = window.value();
-            log.debug("Current window start {}, current cached buf size {}", 
window.start(), buffer.readableBytes());
+        /**
+         * Retain the buffer. This is allowed, only when the buffer is not 
already released.
+         *
+         * @return true if the buffer is retained successfully, false 
otherwise.
+         */
+        boolean retain() {
+            return refCnt.updateAndGet(x -> x > 0 ? x + 1 : x) > 0;
         }
 
-        try {
-            if (out instanceof HttpOutput) {
-                HttpOutput output = (HttpOutput) out;
-                //no mem_copy and memory allocations here
-                ByteBuffer[] buffers = buffer.nioBuffers();
-                for (ByteBuffer buffer0 : buffers) {
-                    output.write(buffer0);
-                }
-            } else {
-                //read data from buffer and write it to output stream, with no 
more heap buffer(byte[]) allocation.
-                //not modify buffer readIndex/writeIndex here.
-                int readIndex = buffer.readerIndex();
-                int readableBytes = buffer.readableBytes();
-                for (int i = 0; i < readableBytes; i++) {
-                    out.write(buffer.getByte(readIndex + i));
-                }
-            }
-        } finally {
-            if (!exposeBufferMetrics && buffer.refCnt() > 0) {
-                buffer.release();
-                log.debug("Metrics buffer released.");
+        /**
+         * Release the buffer.
+         */
+        public void release() {
+            int newValue = refCnt.decrementAndGet();
+            if (newValue == 0) {
+                bufferFuture.whenComplete((byteBuf, throwable) -> {
+                    if (byteBuf != null) {
+                        byteBuf.release();
+                    }
+                });
             }
         }
     }
 
-    private static ByteBuf generate0(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
-                                     boolean includeProducerMetrics, boolean 
splitTopicAndPartitionIndexLabel,
-                                     List<PrometheusRawMetricsProvider> 
metricsProviders) throws IOException {
-        //Use unpooled buffers here to avoid direct buffer usage increasing.
-        //when write out 200MB data, MAX_COMPONENTS = 64 needn't mem_copy. 
see: CompositeByteBuf#consolidateIfNeeded()
-        ByteBuf buf = 
UnpooledByteBufAllocator.DEFAULT.compositeDirectBuffer(MAX_COMPONENTS);
+    private final PulsarService pulsar;
+    private final boolean includeTopicMetrics;
+    private final boolean includeConsumerMetrics;
+    private final boolean includeProducerMetrics;
+    private final boolean splitTopicAndPartitionIndexLabel;
+    private final Clock clock;
+
+    private volatile int initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
+
+    public PrometheusMetricsGenerator(PulsarService pulsar, boolean 
includeTopicMetrics,
+                                      boolean includeConsumerMetrics, boolean 
includeProducerMetrics,
+                                      boolean 
splitTopicAndPartitionIndexLabel, Clock clock) {
+        this.pulsar = pulsar;
+        this.includeTopicMetrics = includeTopicMetrics;
+        this.includeConsumerMetrics = includeConsumerMetrics;
+        this.includeProducerMetrics = includeProducerMetrics;
+        this.splitTopicAndPartitionIndexLabel = 
splitTopicAndPartitionIndexLabel;
+        this.clock = clock;
+    }
+
+    private ByteBuf generate0(List<PrometheusRawMetricsProvider> 
metricsProviders) {
+        ByteBuf buf = allocateMultipartCompositeDirectBuffer();
         boolean exceptionHappens = false;
         //Used in namespace/topic and transaction aggregators as share metric 
names
         PrometheusMetricStreams metricStreams = new PrometheusMetricStreams();
@@ -220,10 +176,34 @@ public class PrometheusMetricsGenerator {
             //if exception happens, release buffer
             if (exceptionHappens) {
                 buf.release();
+            } else {
+                // for the next time, the initial buffer size will be 
suggested by the last buffer size
+                initialBufferSize = Math.max(DEFAULT_INITIAL_BUFFER_SIZE, 
buf.readableBytes());
             }
         }
     }
 
+    private ByteBuf allocateMultipartCompositeDirectBuffer() {
+        // use composite buffer with pre-allocated buffers to ensure that the 
pooled allocator can be used
+        // for allocating the buffers
+        ByteBufAllocator byteBufAllocator = PulsarByteBufAllocator.DEFAULT;
+        int chunkSize;
+        if (byteBufAllocator instanceof PooledByteBufAllocator) {
+            PooledByteBufAllocator pooledByteBufAllocator = 
(PooledByteBufAllocator) byteBufAllocator;
+            chunkSize = Math.max(pooledByteBufAllocator.metric().chunkSize(), 
DEFAULT_INITIAL_BUFFER_SIZE);
+        } else {
+            chunkSize = DEFAULT_INITIAL_BUFFER_SIZE;
+        }
+        CompositeByteBuf buf = byteBufAllocator.compositeDirectBuffer(
+                Math.max(MINIMUM_FOR_MAX_COMPONENTS, (initialBufferSize / 
chunkSize) + 1));
+        int totalLen = 0;
+        while (totalLen < initialBufferSize) {
+            totalLen += chunkSize;
+            buf.addComponent(false, byteBufAllocator.directBuffer(chunkSize));
+        }
+        return buf;
+    }
+
     private static void generateBrokerBasicMetrics(PulsarService pulsar, 
SimpleTextOutputStream stream) {
         String clusterName = pulsar.getConfiguration().getClusterName();
         // generate managedLedgerCache metrics
@@ -269,12 +249,13 @@ public class PrometheusMetricsGenerator {
                         String name = key.substring(0, nameIndex);
                         value = key.substring(nameIndex + 1);
                         if (!names.contains(name)) {
-                            stream.write("# TYPE ").write(name.replace("brk_", 
"pulsar_")).write(' ')
-                                    .write(getTypeStr(metricType)).write("\n");
+                            stream.write("# TYPE ");
+                            writeNameReplacingBrkPrefix(stream, name);
+                            stream.write(' 
').write(getTypeStr(metricType)).write("\n");
                             names.add(name);
                         }
-                        stream.write(name.replace("brk_", "pulsar_"))
-                                
.write("{cluster=\"").write(cluster).write('"');
+                        writeNameReplacingBrkPrefix(stream, name);
+                        stream.write("{cluster=\"").write(cluster).write('"');
                     } catch (Exception e) {
                         continue;
                     }
@@ -283,12 +264,13 @@ public class PrometheusMetricsGenerator {
 
                     String name = entry.getKey();
                     if (!names.contains(name)) {
-                        stream.write("# TYPE 
").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
-                                .write(getTypeStr(metricType)).write('\n');
+                        stream.write("# TYPE ");
+                        writeNameReplacingBrkPrefix(stream, entry.getKey());
+                        stream.write(' 
').write(getTypeStr(metricType)).write('\n');
                         names.add(name);
                     }
-                    stream.write(name.replace("brk_", "pulsar_"))
-                            .write("{cluster=\"").write(cluster).write('"');
+                    writeNameReplacingBrkPrefix(stream, name);
+                    stream.write("{cluster=\"").write(cluster).write('"');
                 }
 
                 //to avoid quantile label duplicated
@@ -308,18 +290,98 @@ public class PrometheusMetricsGenerator {
         }
     }
 
+    private static SimpleTextOutputStream 
writeNameReplacingBrkPrefix(SimpleTextOutputStream stream, String name) {
+        if (name.startsWith("brk_")) {
+            return 
stream.write("pulsar_").write(CharBuffer.wrap(name).position("brk_".length()));
+        } else {
+            return stream.write(name);
+        }
+    }
+
     private static void generateManagedLedgerBookieClientMetrics(PulsarService 
pulsar, SimpleTextOutputStream stream) {
         StatsProvider statsProvider = 
pulsar.getManagedLedgerClientFactory().getStatsProvider();
         if (statsProvider instanceof NullStatsProvider) {
             return;
         }
 
-        try {
-            Writer writer = new StringWriter();
+        try (Writer writer = new OutputStreamWriter(new 
BufferedOutputStream(new OutputStream() {
+                @Override
+                public void write(int b) throws IOException {
+                    stream.writeByte(b);
+                }
+
+                @Override
+                public void write(byte b[], int off, int len) throws 
IOException {
+                    stream.write(b, off, len);
+                }
+            }), StandardCharsets.UTF_8)) {
             statsProvider.writeAllMetrics(writer);
-            stream.write(writer.toString());
         } catch (IOException e) {
-            // nop
+            log.error("Failed to write managed ledger bookie client metrics", 
e);
+        }
+    }
+
+    public MetricsBuffer renderToBuffer(Executor executor, 
List<PrometheusRawMetricsProvider> metricsProviders) {
+        boolean cacheMetricsResponse = 
pulsar.getConfiguration().isMetricsBufferResponse();
+        while (!closed && !Thread.currentThread().isInterrupted()) {
+            long currentTimeSlot = cacheMetricsResponse ? 
calculateCurrentTimeSlot() : 0;
+            MetricsBuffer currentMetricsBuffer = metricsBuffer;
+            if (currentMetricsBuffer == null || 
currentMetricsBuffer.getBufferFuture().isCompletedExceptionally()
+                    || (currentMetricsBuffer.getBufferFuture().isDone()
+                    && (currentMetricsBuffer.getCreateTimeslot() != 0
+                    && currentTimeSlot > 
currentMetricsBuffer.getCreateTimeslot()))) {
+                MetricsBuffer newMetricsBuffer = new 
MetricsBuffer(currentTimeSlot);
+                if (metricsBufferFieldUpdater.compareAndSet(this, 
currentMetricsBuffer, newMetricsBuffer)) {
+                    if (currentMetricsBuffer != null) {
+                        currentMetricsBuffer.release();
+                    }
+                    CompletableFuture<ByteBuf> bufferFuture = 
newMetricsBuffer.getBufferFuture();
+                    executor.execute(() -> {
+                        try {
+                            bufferFuture.complete(generate0(metricsProviders));
+                        } catch (Exception e) {
+                            bufferFuture.completeExceptionally(e);
+                        } finally {
+                            if (currentTimeSlot == 0) {
+                                // if the buffer is not cached, release it 
after the future is completed
+                                metricsBufferFieldUpdater.compareAndSet(this, 
newMetricsBuffer, null);
+                                newMetricsBuffer.release();
+                            }
+                        }
+                    });
+                    // no need to retain before returning since the new buffer 
starts with refCnt 2
+                    return newMetricsBuffer;
+                } else {
+                    currentMetricsBuffer = metricsBuffer;
+                }
+            }
+            // retain the buffer before returning
+            // if the buffer is already released, retaining won't succeed, 
retry in that case
+            if (currentMetricsBuffer != null && currentMetricsBuffer.retain()) 
{
+                return currentMetricsBuffer;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Calculate the current time slot based on the current time.
+     * This is to ensure that cached metrics are refreshed consistently at a 
fixed interval regardless of the request
+     * time.
+     */
+    private long calculateCurrentTimeSlot() {
+        long cacheTimeoutMillis =
+                TimeUnit.SECONDS.toMillis(Math.max(1, 
pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds()));
+        long now = clock.millis();
+        return now / cacheTimeoutMillis;
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+        MetricsBuffer buffer = metricsBufferFieldUpdater.getAndSet(this, null);
+        if (buffer != null) {
+            buffer.release();
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
index 42bd2652883..7fcc74e965c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
@@ -18,34 +18,142 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
+import org.eclipse.jetty.server.HttpOutput;
 
+@Slf4j
 public class PulsarPrometheusMetricsServlet extends PrometheusMetricsServlet {
-
     private static final long serialVersionUID = 1L;
+    private static final int EXECUTOR_MAX_THREADS = 4;
 
-    private final PulsarService pulsar;
-    private final boolean shouldExportTopicMetrics;
-    private final boolean shouldExportConsumerMetrics;
-    private final boolean shouldExportProducerMetrics;
-    private final boolean splitTopicAndPartitionLabel;
+    private final PrometheusMetricsGenerator prometheusMetricsGenerator;
 
     public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean 
includeTopicMetrics,
-                                          boolean includeConsumerMetrics, 
boolean shouldExportProducerMetrics,
+                                          boolean includeConsumerMetrics, 
boolean includeProducerMetrics,
                                           boolean splitTopicAndPartitionLabel) 
{
-        super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), 
pulsar.getConfiguration().getClusterName());
-        this.pulsar = pulsar;
-        this.shouldExportTopicMetrics = includeTopicMetrics;
-        this.shouldExportConsumerMetrics = includeConsumerMetrics;
-        this.shouldExportProducerMetrics = shouldExportProducerMetrics;
-        this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
+        super(pulsar.getConfiguration().getMetricsServletTimeoutMs(), 
pulsar.getConfiguration().getClusterName(),
+                EXECUTOR_MAX_THREADS);
+        MetricsExports.initialize();
+        prometheusMetricsGenerator =
+                new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, 
includeConsumerMetrics,
+                        includeProducerMetrics, splitTopicAndPartitionLabel, 
Clock.systemUTC());
     }
 
+
     @Override
-    protected void generateMetrics(String cluster, ServletOutputStream 
outputStream) throws IOException {
-        PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, 
shouldExportConsumerMetrics,
-                shouldExportProducerMetrics, splitTopicAndPartitionLabel, 
outputStream, metricsProviders);
+    public void destroy() {
+        super.destroy();
+        prometheusMetricsGenerator.close();
+    }
+
+    protected void doGet(HttpServletRequest request, HttpServletResponse 
response) {
+        AsyncContext context = request.startAsync();
+        // set hard timeout to 2 * timeout
+        if (metricsServletTimeoutMs > 0) {
+            context.setTimeout(metricsServletTimeoutMs * 2);
+        }
+        long startNanos = System.nanoTime();
+        AtomicBoolean skipWritingResponse = new AtomicBoolean(false);
+        context.addListener(new AsyncListener() {
+            @Override
+            public void onComplete(AsyncEvent event) throws IOException {
+            }
+
+            @Override
+            public void onTimeout(AsyncEvent event) throws IOException {
+                log.warn("Prometheus metrics request timed out");
+                skipWritingResponse.set(true);
+                HttpServletResponse res = (HttpServletResponse) 
context.getResponse();
+                if (!res.isCommitted()) {
+                    res.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                }
+                context.complete();
+            }
+
+            @Override
+            public void onError(AsyncEvent event) throws IOException {
+                skipWritingResponse.set(true);
+            }
+
+            @Override
+            public void onStartAsync(AsyncEvent event) throws IOException {
+            }
+        });
+        PrometheusMetricsGenerator.MetricsBuffer metricsBuffer =
+                prometheusMetricsGenerator.renderToBuffer(executor, 
metricsProviders);
+        if (metricsBuffer == null) {
+            log.info("Service is closing, skip writing metrics.");
+            response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+            context.complete();
+            return;
+        }
+        metricsBuffer.getBufferFuture().whenComplete((buffer, ex) -> 
executor.execute(() -> {
+            try {
+                long elapsedNanos = System.nanoTime() - startNanos;
+                // check if the request has been timed out, implement a soft 
timeout
+                // so that response writing can continue to up to 2 * timeout
+                if (metricsServletTimeoutMs > 0 && elapsedNanos > 
TimeUnit.MILLISECONDS.toNanos(
+                        metricsServletTimeoutMs)) {
+                    log.warn("Prometheus metrics request was too long in queue 
({}ms). Skipping sending metrics.",
+                            TimeUnit.NANOSECONDS.toMillis(elapsedNanos));
+                    if (!response.isCommitted() && !skipWritingResponse.get()) 
{
+                        
response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                    }
+                    return;
+                }
+                if (skipWritingResponse.get()) {
+                    log.warn("Response has timed or failed, skip writing 
metrics.");
+                    return;
+                }
+                if (response.isCommitted()) {
+                    log.warn("Response is already committed, cannot write 
metrics");
+                    return;
+                }
+                if (ex != null) {
+                    log.error("Failed to generate metrics", ex);
+                    response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                    return;
+                }
+                if (buffer == null) {
+                    log.error("Failed to generate metrics, buffer is null");
+                    response.setStatus(HTTP_STATUS_INTERNAL_SERVER_ERROR_500);
+                } else {
+                    response.setStatus(HTTP_STATUS_OK_200);
+                    response.setContentType("text/plain;charset=utf-8");
+                    ServletOutputStream outputStream = 
response.getOutputStream();
+                    if (outputStream instanceof HttpOutput) {
+                        HttpOutput output = (HttpOutput) outputStream;
+                        for (ByteBuffer nioBuffer : buffer.nioBuffers()) {
+                            output.write(nioBuffer);
+                        }
+                    } else {
+                        int length = buffer.readableBytes();
+                        if (length > 0) {
+                            buffer.duplicate().readBytes(outputStream, length);
+                        }
+                    }
+                }
+            } catch (EOFException e) {
+                log.error("Failed to write metrics to response due to 
EOFException");
+            } catch (IOException e) {
+                log.error("Failed to write metrics to response", e);
+            } finally {
+                metricsBuffer.release();
+                context.complete();
+            }
+        }));
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index dda03e3e59d..33ef05df9eb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -475,7 +475,9 @@ class TopicStats {
     static void writeTopicMetric(PrometheusMetricStreams stream, String 
metricName, Number value, String cluster,
                                  String namespace, String topic, boolean 
splitTopicAndPartitionIndexLabel,
                                  String... extraLabelsAndValues) {
-        String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel 
? 8 : 6];
+        int baseLabelCount = splitTopicAndPartitionIndexLabel ? 8 : 6;
+        String[] labelsAndValues =
+                new String[baseLabelCount + (extraLabelsAndValues != null ? 
extraLabelsAndValues.length : 0)];
         labelsAndValues[0] = "cluster";
         labelsAndValues[1] = cluster;
         labelsAndValues[2] = "namespace";
@@ -495,7 +497,11 @@ class TopicStats {
         } else {
             labelsAndValues[5] = topic;
         }
-        String[] labels = ArrayUtils.addAll(labelsAndValues, 
extraLabelsAndValues);
-        stream.writeSample(metricName, value, labels);
+        if (extraLabelsAndValues != null) {
+            for (int i = 0; i < extraLabelsAndValues.length; i++) {
+                labelsAndValues[baseLabelCount + i] = extraLabelsAndValues[i];
+            }
+        }
+        stream.writeSample(metricName, value, labelsAndValues);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
new file mode 100644
index 00000000000..fcc3b6aa88f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.eclipse.jetty.server.HttpOutput;
+
+public class PrometheusMetricsTestUtil {
+    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
+                                boolean includeProducerMetrics, OutputStream 
out) throws IOException {
+        generate(new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, 
includeConsumerMetrics,
+                includeProducerMetrics, false, Clock.systemUTC()), out, null);
+    }
+
+    public static void generate(PulsarService pulsar, boolean 
includeTopicMetrics, boolean includeConsumerMetrics,
+                                boolean includeProducerMetrics, boolean 
splitTopicAndPartitionIndexLabel,
+                                OutputStream out) throws IOException {
+        generate(new PrometheusMetricsGenerator(pulsar, includeTopicMetrics, 
includeConsumerMetrics,
+                includeProducerMetrics, splitTopicAndPartitionIndexLabel, 
Clock.systemUTC()), out, null);
+    }
+
+    public static void generate(PrometheusMetricsGenerator metricsGenerator, 
OutputStream out,
+                                List<PrometheusRawMetricsProvider> 
metricsProviders) throws IOException {
+        PrometheusMetricsGenerator.MetricsBuffer metricsBuffer =
+                
metricsGenerator.renderToBuffer(MoreExecutors.directExecutor(), 
metricsProviders);
+        try {
+            ByteBuf buffer = null;
+            try {
+                buffer = metricsBuffer.getBufferFuture().get(5, 
TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException(e);
+            } catch (ExecutionException | TimeoutException e) {
+                throw new IOException(e);
+            }
+            if (buffer == null) {
+                return;
+            }
+            if (out instanceof HttpOutput) {
+                HttpOutput output = (HttpOutput) out;
+                ByteBuffer[] nioBuffers = buffer.nioBuffers();
+                for (ByteBuffer nioBuffer : nioBuffers) {
+                    output.write(nioBuffer);
+                }
+            } else {
+                int length = buffer.readableBytes();
+                if (length > 0) {
+                    buffer.duplicate().readBytes(out, length);
+                }
+            }
+        } finally {
+            metricsBuffer.release();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 54fec3934dd..981feb00287 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -38,11 +38,11 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -217,7 +217,7 @@ public class BucketDelayedDeliveryTest extends 
DelayedDeliveryTest {
         Thread.sleep(2000);
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, true, true, output);
+        PrometheusMetricsTestUtil.generate(pulsar, true, true, true, output);
         String metricsStr = output.toString(StandardCharsets.UTF_8);
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
 
@@ -303,7 +303,7 @@ public class BucketDelayedDeliveryTest extends 
DelayedDeliveryTest {
         assertEquals(opLatencyMetricsSum.intValue(), 
opLatencyTopicMetrics.get().value);
 
         ByteArrayOutputStream namespaceOutput = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, true, true, 
namespaceOutput);
+        PrometheusMetricsTestUtil.generate(pulsar, false, true, true, 
namespaceOutput);
         Multimap<String, PrometheusMetricsTest.Metric> namespaceMetricsMap = 
PrometheusMetricsTest.parseMetrics(namespaceOutput.toString(StandardCharsets.UTF_8));
 
         Optional<PrometheusMetricsTest.Metric> namespaceMetric =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 4c0d8eb6a49..5c49b472303 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -63,12 +63,12 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -366,7 +366,7 @@ public class PersistentTopicTest extends BrokerTestBase {
 
         latch.await(10, TimeUnit.SECONDS);
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
+        PrometheusMetricsTestUtil.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
         String metricsStr = output.toString(StandardCharsets.UTF_8);
 
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index fbf734f331f..2d8b610e04d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -41,12 +41,12 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.naming.TopicName;
@@ -120,7 +120,7 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
         deleteSchema(schemaId, version(1));
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
output);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
output);
         output.flush();
         String metricsStr = output.toString(StandardCharsets.UTF_8);
         Multimap<String, PrometheusMetricsTest.Metric> metrics = 
PrometheusMetricsTest.parseMetrics(metricsStr);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index f29c643a8f5..de65d5db564 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
@@ -50,7 +51,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
 import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -333,7 +333,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
         consumer2.updateRates();
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
+        PrometheusMetricsTestUtil.generate(pulsar, exposeTopicLevelMetrics, 
true, true, output);
         String metricStr = output.toString(StandardCharsets.UTF_8);
 
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricStr);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index 8ae0242c623..7368d42355c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -28,10 +28,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -99,7 +99,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
false, output);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, false, 
output);
         String metricsStr = output.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
 
@@ -189,7 +189,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
false, output);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, false, 
output);
         String metricsStr = output.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index abd00d374f3..4bf0e6eb6ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.broker.stats;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -35,6 +38,7 @@ import java.lang.reflect.Field;
 import java.math.RoundingMode;
 import java.nio.charset.StandardCharsets;
 import java.text.NumberFormat;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -52,6 +56,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -62,6 +67,7 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -84,7 +90,6 @@ import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.zookeeper.CreateMode;
 import org.awaitility.Awaitility;
-import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -154,7 +159,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         });
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times"));
@@ -184,7 +189,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         @Cleanup
         ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut2);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut2);
         String metricsStr2 = statsOut2.toString();
         Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
         assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times"));
@@ -216,7 +221,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         Collection<Metric> metric = metrics.get("pulsar_topics_count");
@@ -253,7 +258,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         producerInServer.getStats().msgThroughputIn = 100;
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, true, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, true, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         assertTrue(metrics.containsKey("pulsar_average_msg_size"));
@@ -296,7 +301,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -394,7 +399,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -503,7 +508,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         c2.close();
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -581,7 +586,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         // includeTopicMetric true
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
@@ -613,7 +618,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         // includeTopicMetric false
         ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut2);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut2);
         String metricsStr2 = statsOut2.toString();
         Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
 
@@ -697,7 +702,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         Awaitility.await().until(() -> sub2.getExpiredMessageRate() != 0.0);
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         // There should be 2 metrics with different tags for each topic
@@ -778,7 +783,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         loadManager.getLoadManager().updateLocalBrokerData();
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_in"));
@@ -823,7 +828,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
@@ -870,7 +875,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -943,7 +948,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, true, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, true, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1011,7 +1016,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, true, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, true, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1098,7 +1103,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Map<String, String> typeDefs = new HashMap<>();
@@ -1202,7 +1207,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1238,7 +1243,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1316,7 +1321,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1397,7 +1402,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         });
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_authentication_success_total");
@@ -1458,7 +1463,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_expired_token_total");
@@ -1499,7 +1504,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         Metric countMetric = ((List<Metric>) 
metrics.get("pulsar_expiring_token_minutes_count")).get(0);
@@ -1573,7 +1578,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         // enable ExposeManagedCursorMetricsInPrometheus
         
pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(true);
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -1586,7 +1591,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         // disable ExposeManagedCursorMetricsInPrometheus
         
pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(false);
         ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut2);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut2);
         String metricsStr2 = statsOut2.toString();
         Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
         List<Metric> cm2 = (List<Metric>) 
metrics2.get("pulsar_ml_cursor_persistLedgerSucceed");
@@ -1605,7 +1610,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                 .create();
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_connection_created_total_count");
@@ -1622,7 +1627,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
 
         pulsarClient.close();
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
 
         metrics = parseMetrics(metricsStr);
@@ -1645,7 +1650,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
 
         pulsarClient.close();
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
 
         metrics = parseMetrics(metricsStr);
@@ -1689,7 +1694,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_compaction_removed_event_count");
@@ -1724,7 +1729,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         Compactor compactor = 
((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
         compactor.compact(topicName).get();
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
         cm = (List<Metric>) 
metrics.get("pulsar_compaction_removed_event_count");
@@ -1757,31 +1762,36 @@ public class PrometheusMetricsTest extends 
BrokerTestBase {
 
     @Test
     public void testMetricsWithCache() throws Throwable {
-        ServiceConfiguration configuration = 
Mockito.mock(ServiceConfiguration.class);
-        
Mockito.when(configuration.getManagedLedgerStatsPeriodSeconds()).thenReturn(2);
-        Mockito.when(configuration.isMetricsBufferResponse()).thenReturn(true);
-        
Mockito.when(configuration.getClusterName()).thenReturn(configClusterName);
-        Mockito.when(pulsar.getConfiguration()).thenReturn(configuration);
+        ServiceConfiguration configuration = pulsar.getConfiguration();
+        configuration.setManagedLedgerStatsPeriodSeconds(2);
+        configuration.setMetricsBufferResponse(true);
+        configuration.setClusterName(configClusterName);
 
-        int period = 
pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds();
-        TimeWindow<Object> timeWindow = new TimeWindow<>(2, (int) 
TimeUnit.SECONDS.toMillis(period));
+        // create a mock clock to control the time
+        AtomicLong currentTimeMillis = new 
AtomicLong(System.currentTimeMillis());
+        Clock clock = mock();
+        when(clock.millis()).thenAnswer(invocation -> currentTimeMillis.get());
 
+        PrometheusMetricsGenerator prometheusMetricsGenerator =
+                new PrometheusMetricsGenerator(pulsar, true, false, false,
+                        false, clock);
+
+        String previousMetrics = null;
         for (int a = 0; a < 4; a++) {
-            long start = System.currentTimeMillis();
             ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream();
-            PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
false, statsOut1, null);
+            PrometheusMetricsTestUtil.generate(prometheusMetricsGenerator, 
statsOut1, null);
             ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
-            PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
false, statsOut2, null);
-            long end = System.currentTimeMillis();
-
-            if (timeWindow.currentWindowStart(start) == 
timeWindow.currentWindowStart(end)) {
-                String metricsStr1 = statsOut1.toString();
-                String metricsStr2 = statsOut2.toString();
-                assertEquals(metricsStr1, metricsStr2);
-                Multimap<String, Metric> metrics = parseMetrics(metricsStr1);
-            }
+            PrometheusMetricsTestUtil.generate(prometheusMetricsGenerator, 
statsOut2, null);
+
+            String metricsStr1 = statsOut1.toString();
+            String metricsStr2 = statsOut2.toString();
+            assertTrue(metricsStr1.length() > 1000);
+            assertEquals(metricsStr1, metricsStr2);
+            assertNotEquals(metricsStr1, previousMetrics);
+            previousMetrics = metricsStr1;
 
-            Thread.sleep(TimeUnit.SECONDS.toMillis(period / 2));
+            // move time forward
+            currentTimeMillis.addAndGet(TimeUnit.SECONDS.toMillis(2));
         }
     }
 
@@ -1809,7 +1819,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                 .subscribe();
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, true,  
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, true,  
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
         Collection<Metric> metric = metrics.get("pulsar_consumers_count");
@@ -1845,7 +1855,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Pattern typePattern = 
Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
@@ -1961,7 +1971,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
                 .subscribe();
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false,
+        PrometheusMetricsTestUtil.generate(pulsar, true, false,
                 false, statsOut);
         String metricsStr = statsOut.toString();
         final List<String> subCountLines = metricsStr.lines()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index d5e0066a86f..83e6f43cbaf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -29,13 +29,13 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.EntryFilterSupport;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.plugin.EntryFilterTest;
 import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -231,7 +231,7 @@ public class SubscriptionStatsTest extends 
ProducerConsumerBase {
         }
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, 
false, output);
+        PrometheusMetricsTestUtil.generate(pulsar, enableTopicStats, false, 
false, output);
         String metricsStr = output.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metrics = 
PrometheusMetricsTest.parseMetrics(metricsStr);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java
deleted file mode 100644
index 89528c19653..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TimeWindowTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import org.testng.annotations.Test;
-
-public class TimeWindowTest {
-
-    @Test
-    public void windowTest() throws Exception {
-        int intervalInMs = 1000;
-        int sampleCount = 2;
-        TimeWindow<Integer> timeWindow = new TimeWindow<>(sampleCount, 
intervalInMs);
-
-        WindowWrap<Integer> expect1 = timeWindow.current(oldValue -> 1);
-        WindowWrap<Integer> expect2 = timeWindow.current(oldValue -> null);
-        assertNotNull(expect1);
-        assertNotNull(expect2);
-
-        if (expect1.start() == expect2.start()) {
-            assertEquals((int) expect1.value(), 1);
-            assertEquals(expect1, expect2);
-            assertEquals(expect1.value(), expect2.value());
-        }
-
-        Thread.sleep(intervalInMs);
-
-        WindowWrap<Integer> expect3 = timeWindow.current(oldValue -> 2);
-        WindowWrap<Integer> expect4 = timeWindow.current(oldValue -> null);
-        assertNotNull(expect3);
-        assertNotNull(expect4);
-
-        if (expect3.start() == expect4.start()) {
-            assertEquals((int) expect3.value(), 2);
-            assertEquals(expect3, expect4);
-            assertEquals(expect3.value(), expect4.value());
-        }
-
-        Thread.sleep(intervalInMs);
-
-        WindowWrap<Integer> expect5 = timeWindow.current(oldValue -> 3);
-        WindowWrap<Integer> expect6 = timeWindow.current(oldValue -> null);
-        assertNotNull(expect5);
-        assertNotNull(expect6);
-
-        if (expect5.start() == expect6.start()) {
-            assertEquals((int) expect5.value(), 3);
-            assertEquals(expect5, expect6);
-            assertEquals(expect5.value(), expect6.value());
-        }
-
-        Thread.sleep(intervalInMs);
-
-        WindowWrap<Integer> expect7 = timeWindow.current(oldValue -> 4);
-        WindowWrap<Integer> expect8 = timeWindow.current(oldValue -> null);
-        assertNotNull(expect7);
-        assertNotNull(expect8);
-
-        if (expect7.start() == expect8.start()) {
-            assertEquals((int) expect7.value(), 4);
-            assertEquals(expect7, expect8);
-            assertEquals(expect7.value(), expect8.value());
-        }
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 4d38f5fad51..85c6dd795d7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -37,9 +37,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -117,7 +117,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         pulsar.getTransactionMetadataStoreService().getStores()
                 .get(transactionCoordinatorIDTwo).newTransaction(timeout, 
null).get();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metrics = 
parseMetrics(metricsStr);
         Collection<PrometheusMetricsTest.Metric> metric = 
metrics.get("pulsar_txn_active_count");
@@ -185,7 +185,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         pulsar.getBrokerService().updateRates();
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metrics = 
parseMetrics(metricsStr);
 
@@ -215,7 +215,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         });
 
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
 
@@ -271,7 +271,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         producer.send("hello pulsar".getBytes());
         consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, PrometheusMetricsTest.Metric> metrics = 
parseMetrics(metricsStr);
@@ -289,7 +289,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         
checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 
126, metric);
 
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
         metric = metrics.get("pulsar_storage_size");
@@ -333,7 +333,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         producer.send("hello pulsar".getBytes());
         consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Multimap<String, PrometheusMetricsTest.Metric> metrics = 
parseMetrics(metricsStr);
@@ -358,7 +358,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
 
         statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
         metricsStr = statsOut.toString();
         metrics = parseMetrics(metricsStr);
         metric = metrics.get("pulsar_storage_size");
@@ -392,7 +392,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
                     .send();
         }
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
 
         Map<String, String> typeDefs = new HashMap<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 81ff95be3bb..a6b05ada6c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -37,10 +37,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
@@ -229,7 +229,7 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
 
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, 
false, false, statsOut);
+        PrometheusMetricsTestUtil.generate(pulsarServiceList.get(0), true, 
false, false, statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 2e154715ac9..7f6750a0e36 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -48,13 +48,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -336,7 +336,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
 
         @Cleanup
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, 
false, false, statsOut);
+        PrometheusMetricsTestUtil.generate(pulsarServiceList.get(0), true, 
false, false, statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 7280a3de279..9c5b031a41b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -53,10 +53,10 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import lombok.Cleanup;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
-import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -106,7 +106,7 @@ public class WebServiceTest {
     public void testWebExecutorMetrics() throws Exception {
         setupEnv(true, false, false, false, -1, false);
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
+        PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
statsOut);
         String metricsStr = statsOut.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metrics = 
PrometheusMetricsTest.parseMetrics(metricsStr);
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index c8c639606aa..9bf6302f50f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.util;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.CharsetUtil;
+import java.nio.CharBuffer;
 
 /**
  * Format strings and numbers into a ByteBuf without any memory allocation.
@@ -28,6 +29,7 @@ public class SimpleTextOutputStream {
     private final ByteBuf buffer;
     private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6', 
'7', '8', '9', 'a', 'b', 'c', 'd', 'e',
             'f'};
+    private final CharBuffer singleCharBuffer = CharBuffer.allocate(1);
 
     public SimpleTextOutputStream(ByteBuf buffer) {
         this.buffer = buffer;
@@ -44,11 +46,17 @@ public class SimpleTextOutputStream {
     }
 
     public SimpleTextOutputStream write(char c) {
-        write(String.valueOf(c));
+        //  In UTF-8, any character from U+0000 to U+007F is encoded in one 
byte
+        if (c <= '\u007F') {
+            buffer.writeByte((byte) c);
+            return this;
+        }
+        singleCharBuffer.put(0, c);
+        buffer.writeCharSequence(singleCharBuffer, CharsetUtil.UTF_8);
         return this;
     }
 
-    public SimpleTextOutputStream write(String s) {
+    public SimpleTextOutputStream write(CharSequence s) {
         if (s == null) {
             return this;
         }
@@ -136,4 +144,8 @@ public class SimpleTextOutputStream {
     public ByteBuf getBuffer() {
         return buffer;
     }
+
+    public void writeByte(int b) {
+        buffer.writeByte(b);
+    }
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index db2969e3c39..39c8fb5e086 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -392,6 +392,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private boolean authenticateMetricsEndpoint = true;
 
+    @FieldContext(
+            category = CATEGORY_HTTP,
+            doc = "Time in milliseconds that metrics endpoint would time out. 
Default is 30s.\n"
+                    + " Set it to 0 to disable timeout."
+    )
+    private long metricsServletTimeoutMs = 30000;
 
     @FieldContext(
         category = CATEGORY_SASL_AUTH,
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 719c7c2cbda..b6d36232d88 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -295,7 +295,8 @@ public class ProxyService implements Closeable {
     }
 
     private synchronized void createMetricsServlet() {
-        this.metricsServlet = new PrometheusMetricsServlet(-1L, 
proxyConfig.getClusterName());
+        this.metricsServlet =
+                new 
PrometheusMetricsServlet(proxyConfig.getMetricsServletTimeoutMs(), 
proxyConfig.getClusterName());
         if (pendingMetricsProviders != null) {
             pendingMetricsProviders.forEach(provider -> 
metricsServlet.addRawMetricsProvider(provider));
             this.pendingMetricsProviders = null;
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 1a98601f2a9..8ac60b21a11 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -26,6 +26,7 @@ import static 
org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Gauge.Child;
@@ -234,21 +235,36 @@ public class ProxyServiceStarter {
         if (!metricsInitialized) {
             // Setup metrics
             DefaultExports.initialize();
+            CollectorRegistry registry = CollectorRegistry.defaultRegistry;
 
             // Report direct memory from Netty counters
-            Gauge.build("jvm_memory_direct_bytes_used", 
"-").create().setChild(new Child() {
-                @Override
-                public double get() {
-                    return getJvmDirectMemoryUsed();
-                }
-            }).register(CollectorRegistry.defaultRegistry);
+            Collector jvmMemoryDirectBytesUsed =
+                    Gauge.build("jvm_memory_direct_bytes_used", 
"-").create().setChild(new Child() {
+                        @Override
+                        public double get() {
+                            return getJvmDirectMemoryUsed();
+                        }
+                    });
+            try {
+                registry.register(jvmMemoryDirectBytesUsed);
+            } catch (IllegalArgumentException e) {
+                // workaround issue in tests where the metric is already 
registered
+                log.debug("Failed to register jvm_memory_direct_bytes_used 
metric: {}", e.getMessage());
+            }
 
-            Gauge.build("jvm_memory_direct_bytes_max", 
"-").create().setChild(new Child() {
-                @Override
-                public double get() {
-                    return DirectMemoryUtils.jvmMaxDirectMemory();
-                }
-            }).register(CollectorRegistry.defaultRegistry);
+            Collector jvmMemoryDirectBytesMax =
+                    Gauge.build("jvm_memory_direct_bytes_max", 
"-").create().setChild(new Child() {
+                        @Override
+                        public double get() {
+                            return DirectMemoryUtils.jvmMaxDirectMemory();
+                        }
+                    });
+            try {
+                registry.register(jvmMemoryDirectBytesMax);
+            } catch (IllegalArgumentException e) {
+                // workaround issue in tests where the metric is already 
registered
+                log.debug("Failed to register jvm_memory_direct_bytes_max 
metric: {}", e.getMessage());
+            }
 
             metricsInitialized = true;
         }

Reply via email to