This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 212b85f6bf9 [improve][broker] Optimize gzip compression for /metrics
endpoint by sharing/caching compressed result (#22521)
212b85f6bf9 is described below
commit 212b85f6bf90f2e0b8232567993e43ea86525457
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 17 03:15:01 2024 -0700
[improve][broker] Optimize gzip compression for /metrics endpoint by
sharing/caching compressed result (#22521)
(cherry picked from commit 94f6c7ccd2bf8bc261d45ab41f6c7f123359fa47)
# Conflicts:
#
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
---
.../stats/prometheus/PrometheusMetricsServlet.java | 1 +
.../apache/pulsar/broker/web/GzipHandlerUtil.java | 21 +++
.../pulsar/broker/web/GzipHandlerUtilTest.java | 36 +++++
.../org/apache/pulsar/broker/PulsarService.java | 3 +-
.../prometheus/PrometheusMetricsGenerator.java | 176 +++++++++++++++++++--
.../prometheus/PulsarPrometheusMetricsServlet.java | 28 +++-
.../apache/pulsar/PrometheusMetricsTestUtil.java | 2 +-
7 files changed, 253 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 8a41bed29d4..8685348174c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PrometheusMetricsServlet extends HttpServlet {
+ public static final String DEFAULT_METRICS_PATH = "/metrics";
private static final long serialVersionUID = 1L;
static final int HTTP_STATUS_OK_200 = 200;
static final int HTTP_STATUS_INTERNAL_SERVER_ERROR_500 = 500;
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
index 37c9c05e5d5..9e980cecb79 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.broker.web;
import java.util.List;
+import org.eclipse.jetty.http.pathmap.PathSpecSet;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+import org.eclipse.jetty.util.IncludeExclude;
public class GzipHandlerUtil {
public static Handler wrapWithGzipHandler(Handler innerHandler,
List<String> gzipCompressionExcludedPaths) {
@@ -45,4 +47,23 @@ public class GzipHandlerUtil {
&& (gzipCompressionExcludedPaths.get(0).equals("^.*")
|| gzipCompressionExcludedPaths.get(0).equals("^.*$"));
}
+
+ /**
+ * Check if GZIP compression is enabled for the given endpoint.
+ * @param gzipCompressionExcludedPaths list of paths that should not be
compressed
+ * @param endpoint the endpoint to check
+ * @return true if GZIP compression is enabled for the endpoint, false
otherwise
+ */
+ public static boolean isGzipCompressionEnabledForEndpoint(List<String>
gzipCompressionExcludedPaths,
+ String endpoint)
{
+ if (gzipCompressionExcludedPaths == null ||
gzipCompressionExcludedPaths.isEmpty()) {
+ return true;
+ }
+ if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths))
{
+ return false;
+ }
+ IncludeExclude<String> paths = new IncludeExclude<>(PathSpecSet.class);
+ paths.exclude(gzipCompressionExcludedPaths.toArray(new String[0]));
+ return paths.test(endpoint);
+ }
}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
new file mode 100644
index 00000000000..d6958695dec
--- /dev/null
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/GzipHandlerUtilTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import static org.testng.Assert.*;
+import java.util.Arrays;
+import org.testng.annotations.Test;
+
+public class GzipHandlerUtilTest {
+
+ @Test
+ public void testIsGzipCompressionEnabledForEndpoint() {
+ assertTrue(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(null,
"/metrics"));
+
assertFalse(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("^.*"),
"/metrics"));
+
assertFalse(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("^.*$"),
"/metrics"));
+
assertFalse(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("/metrics"),
"/metrics"));
+
assertTrue(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("/metrics"),
"/metrics2"));
+
assertTrue(GzipHandlerUtil.isGzipCompressionEnabledForEndpoint(Arrays.asList("/admin",
"/custom"), "/metrics"));
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f1640bff581..93b7efc0243 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -107,6 +107,7 @@ import
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import
org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
@@ -1013,7 +1014,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
true, attributeMap, true, Topics.class);
// Add metrics servlet
- webService.addServlet("/metrics",
+ webService.addServlet(PrometheusMetricsServlet.DEFAULT_METRICS_PATH,
new ServletHolder(metricsServlet),
config.isAuthenticateMetricsEndpoint(), attributeMap);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 4c00442ab5b..bfcbb5ec89d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
@@ -44,6 +46,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.zip.CRC32;
+import java.util.zip.Deflater;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
@@ -73,7 +77,7 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
private volatile boolean closed;
public static class MetricsBuffer {
- private final CompletableFuture<ByteBuf> bufferFuture;
+ private final CompletableFuture<ResponseBuffer> bufferFuture;
private final long createTimeslot;
private final AtomicInteger refCnt = new AtomicInteger(2);
@@ -82,7 +86,7 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
createTimeslot = timeslot;
}
- public CompletableFuture<ByteBuf> getBufferFuture() {
+ public CompletableFuture<ResponseBuffer> getBufferFuture() {
return bufferFuture;
}
@@ -114,6 +118,151 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
}
}
+ /**
+ * A wraps the response buffer and asynchronously provides a gzip
compressed buffer when requested.
+ */
+ public static class ResponseBuffer {
+ private final ByteBuf uncompressedBuffer;
+ private boolean released = false;
+ private CompletableFuture<ByteBuf> compressedBuffer;
+
+ private ResponseBuffer(final ByteBuf uncompressedBuffer) {
+ this.uncompressedBuffer = uncompressedBuffer;
+ }
+
+ public ByteBuf getUncompressedBuffer() {
+ return uncompressedBuffer;
+ }
+
+ public synchronized CompletableFuture<ByteBuf>
getCompressedBuffer(Executor executor) {
+ if (released) {
+ throw new IllegalStateException("Already released!");
+ }
+ if (compressedBuffer == null) {
+ compressedBuffer = new CompletableFuture<>();
+ ByteBuf retainedDuplicate =
uncompressedBuffer.retainedDuplicate();
+ executor.execute(() -> {
+ try {
+ compressedBuffer.complete(compress(retainedDuplicate));
+ } catch (Exception e) {
+ compressedBuffer.completeExceptionally(e);
+ } finally {
+ retainedDuplicate.release();
+ }
+ });
+ }
+ return compressedBuffer;
+ }
+
+ private ByteBuf compress(ByteBuf uncompressedBuffer) {
+ GzipByteBufferWriter gzipByteBufferWriter = new
GzipByteBufferWriter(uncompressedBuffer.alloc(),
+ uncompressedBuffer.readableBytes());
+ return gzipByteBufferWriter.compress(uncompressedBuffer);
+ }
+
+ public synchronized void release() {
+ released = true;
+ uncompressedBuffer.release();
+ if (compressedBuffer != null) {
+ compressedBuffer.whenComplete((byteBuf, throwable) -> {
+ if (byteBuf != null) {
+ byteBuf.release();
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Compress input nio buffers into gzip format with output in a Netty
composite ByteBuf.
+ */
+ private static class GzipByteBufferWriter {
+ private static final byte[] GZIP_HEADER =
+ new byte[] {(byte) 0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0,
0, 0, 0, 0, 0};
+ private final ByteBufAllocator bufAllocator;
+ private final Deflater deflater;
+ private final CRC32 crc;
+ private final int bufferSize;
+ private final CompositeByteBuf resultBuffer;
+ private ByteBuf backingCompressBuffer;
+ private ByteBuffer compressBuffer;
+
+ GzipByteBufferWriter(ByteBufAllocator bufAllocator, int readableBytes)
{
+ deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+ crc = new CRC32();
+ this.bufferSize =
Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192);
+ this.bufAllocator = bufAllocator;
+ this.resultBuffer =
bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1);
+ allocateBuffer();
+ }
+
+ /**
+ * Compress the input Netty buffer and append it to the result buffer
in gzip format.
+ * @param uncompressedBuffer
+ */
+ public ByteBuf compress(ByteBuf uncompressedBuffer) {
+ try {
+ ByteBuffer[] nioBuffers = uncompressedBuffer.nioBuffers();
+ for (int i = 0, nioBuffersLength = nioBuffers.length; i <
nioBuffersLength; i++) {
+ ByteBuffer nioBuffer = nioBuffers[i];
+ compressAndAppend(nioBuffer, i == 0, i == nioBuffersLength
- 1);
+ }
+ return resultBuffer;
+ } finally {
+ close();
+ }
+ }
+
+ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst,
boolean isLast) {
+ if (isFirst) {
+ // write gzip header
+ compressBuffer.put(GZIP_HEADER);
+ }
+ nioBuffer.mark();
+ crc.update(nioBuffer);
+ nioBuffer.reset();
+ deflater.setInput(nioBuffer);
+ if (isLast) {
+ deflater.finish();
+ }
+ while (!deflater.needsInput() && !deflater.finished()) {
+ int written = deflater.deflate(compressBuffer);
+ if (written == 0 && !compressBuffer.hasRemaining()) {
+ backingCompressBuffer.setIndex(0,
compressBuffer.position());
+ resultBuffer.addComponent(true, backingCompressBuffer);
+ allocateBuffer();
+ }
+ }
+ if (isLast) {
+ // write gzip footer, integer values are in little endian byte
order
+ compressBuffer.order(ByteOrder.LITTLE_ENDIAN);
+ // write CRC32 checksum
+ compressBuffer.putInt((int) crc.getValue());
+ // write uncompressed size
+ compressBuffer.putInt(deflater.getTotalIn());
+ // append the last compressed buffer
+ backingCompressBuffer.setIndex(0, compressBuffer.position());
+ resultBuffer.addComponent(true, backingCompressBuffer);
+ backingCompressBuffer = null;
+ compressBuffer = null;
+ }
+ }
+
+ private void allocateBuffer() {
+ backingCompressBuffer = bufAllocator.directBuffer(bufferSize);
+ compressBuffer = backingCompressBuffer.nioBuffer(0, bufferSize);
+ }
+
+ private void close() {
+ if (deflater != null) {
+ deflater.end();
+ }
+ if (backingCompressBuffer != null) {
+ backingCompressBuffer.release();
+ }
+ }
+ }
+
private final PulsarService pulsar;
private final boolean includeTopicMetrics;
private final boolean includeConsumerMetrics;
@@ -188,13 +337,7 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
// use composite buffer with pre-allocated buffers to ensure that the
pooled allocator can be used
// for allocating the buffers
ByteBufAllocator byteBufAllocator = PulsarByteBufAllocator.DEFAULT;
- int chunkSize;
- if (byteBufAllocator instanceof PooledByteBufAllocator) {
- PooledByteBufAllocator pooledByteBufAllocator =
(PooledByteBufAllocator) byteBufAllocator;
- chunkSize = Math.max(pooledByteBufAllocator.metric().chunkSize(),
DEFAULT_INITIAL_BUFFER_SIZE);
- } else {
- chunkSize = DEFAULT_INITIAL_BUFFER_SIZE;
- }
+ int chunkSize = resolveChunkSize(byteBufAllocator);
CompositeByteBuf buf = byteBufAllocator.compositeDirectBuffer(
Math.max(MINIMUM_FOR_MAX_COMPONENTS, (initialBufferSize /
chunkSize) + 1));
int totalLen = 0;
@@ -205,6 +348,17 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
return buf;
}
+ private static int resolveChunkSize(ByteBufAllocator byteBufAllocator) {
+ int chunkSize;
+ if (byteBufAllocator instanceof PooledByteBufAllocator) {
+ PooledByteBufAllocator pooledByteBufAllocator =
(PooledByteBufAllocator) byteBufAllocator;
+ chunkSize = Math.max(pooledByteBufAllocator.metric().chunkSize(),
DEFAULT_INITIAL_BUFFER_SIZE);
+ } else {
+ chunkSize = DEFAULT_INITIAL_BUFFER_SIZE;
+ }
+ return chunkSize;
+ }
+
private static void generateBrokerBasicMetrics(PulsarService pulsar,
SimpleTextOutputStream stream) {
String clusterName = pulsar.getConfiguration().getClusterName();
// generate managedLedgerCache metrics
@@ -336,10 +490,10 @@ public class PrometheusMetricsGenerator implements
AutoCloseable {
if (currentMetricsBuffer != null) {
currentMetricsBuffer.release();
}
- CompletableFuture<ByteBuf> bufferFuture =
newMetricsBuffer.getBufferFuture();
+ CompletableFuture<ResponseBuffer> bufferFuture =
newMetricsBuffer.getBufferFuture();
executor.execute(() -> {
try {
- bufferFuture.complete(generate0(metricsProviders));
+ bufferFuture.complete(new
ResponseBuffer(generate0(metricsProviders)));
} catch (Exception e) {
bufferFuture.completeExceptionally(e);
} finally {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
index 7fcc74e965c..43514d481dc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PulsarPrometheusMetricsServlet.java
@@ -18,10 +18,13 @@
*/
package org.apache.pulsar.broker.stats.prometheus;
+import static
org.apache.pulsar.broker.web.GzipHandlerUtil.isGzipCompressionEnabledForEndpoint;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Clock;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.AsyncContext;
@@ -40,6 +43,7 @@ public class PulsarPrometheusMetricsServlet extends
PrometheusMetricsServlet {
private static final int EXECUTOR_MAX_THREADS = 4;
private final PrometheusMetricsGenerator prometheusMetricsGenerator;
+ private final boolean gzipCompressionEnabledForMetrics;
public PulsarPrometheusMetricsServlet(PulsarService pulsar, boolean
includeTopicMetrics,
boolean includeConsumerMetrics,
boolean includeProducerMetrics,
@@ -50,6 +54,8 @@ public class PulsarPrometheusMetricsServlet extends
PrometheusMetricsServlet {
prometheusMetricsGenerator =
new PrometheusMetricsGenerator(pulsar, includeTopicMetrics,
includeConsumerMetrics,
includeProducerMetrics, splitTopicAndPartitionLabel,
Clock.systemUTC());
+ gzipCompressionEnabledForMetrics = isGzipCompressionEnabledForEndpoint(
+
pulsar.getConfiguration().getHttpServerGzipCompressionExcludedPaths(),
DEFAULT_METRICS_PATH);
}
@@ -100,7 +106,14 @@ public class PulsarPrometheusMetricsServlet extends
PrometheusMetricsServlet {
context.complete();
return;
}
- metricsBuffer.getBufferFuture().whenComplete((buffer, ex) ->
executor.execute(() -> {
+ boolean compressOutput = gzipCompressionEnabledForMetrics &&
isGzipAccepted(request);
+ metricsBuffer.getBufferFuture().thenCompose(responseBuffer -> {
+ if (compressOutput) {
+ return responseBuffer.getCompressedBuffer(executor);
+ } else {
+ return
CompletableFuture.completedFuture(responseBuffer.getUncompressedBuffer());
+ }
+ }).whenComplete((buffer, ex) -> executor.execute(() -> {
try {
long elapsedNanos = System.nanoTime() - startNanos;
// check if the request has been timed out, implement a soft
timeout
@@ -133,6 +146,9 @@ public class PulsarPrometheusMetricsServlet extends
PrometheusMetricsServlet {
} else {
response.setStatus(HTTP_STATUS_OK_200);
response.setContentType("text/plain;charset=utf-8");
+ if (compressOutput) {
+ response.setHeader("Content-Encoding", "gzip");
+ }
ServletOutputStream outputStream =
response.getOutputStream();
if (outputStream instanceof HttpOutput) {
HttpOutput output = (HttpOutput) outputStream;
@@ -156,4 +172,14 @@ public class PulsarPrometheusMetricsServlet extends
PrometheusMetricsServlet {
}
}));
}
+
+ private boolean isGzipAccepted(HttpServletRequest request) {
+ String acceptEncoding = request.getHeader("Accept-Encoding");
+ if (acceptEncoding != null) {
+ return Arrays.stream(acceptEncoding.split(","))
+ .map(String::trim)
+ .anyMatch(str -> "gzip".equalsIgnoreCase(str));
+ }
+ return false;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
index fcc3b6aa88f..68826372b7b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/PrometheusMetricsTestUtil.java
@@ -55,7 +55,7 @@ public class PrometheusMetricsTestUtil {
try {
ByteBuf buffer = null;
try {
- buffer = metricsBuffer.getBufferFuture().get(5,
TimeUnit.SECONDS);
+ buffer = metricsBuffer.getBufferFuture().get(5,
TimeUnit.SECONDS).getUncompressedBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);