This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1522f0be11b0da4e3ddeabd1e6866bc5036d42b4 Author: Hang Chen <[email protected]> AuthorDate: Wed Mar 13 14:52:43 2024 +0800 [improve] [broker] Servlet support response compression (#21667) (cherry picked from commit 7a4e16a8373754a6bc4713dcfe9d06c674ce3758) --- .../org/apache/pulsar/broker/web/WebService.java | 18 ++--- .../apache/pulsar/broker/web/WebServiceTest.java | 78 +++++++++++++++++++++- 2 files changed, 85 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 57da8df5e73..a098c8b30cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -43,6 +43,7 @@ import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -253,17 +254,18 @@ public class WebService implements AutoCloseable { public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, Map<String, Object> attributeMap) { - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); // Notice: each context path should be unique, but there's nothing here to verify that - context.setContextPath(path); - context.addServlet(servletHolder, MATCH_ALL); + servletContextHandler.setContextPath(path); + servletContextHandler.addServlet(servletHolder, MATCH_ALL); if (attributeMap != null) { - attributeMap.forEach((key, value) -> { - context.setAttribute(key, value); - }); + attributeMap.forEach(servletContextHandler::setAttribute); } - filterInitializer.addFilters(context, requiresAuthentication); - handlers.add(context); + filterInitializer.addFilters(servletContextHandler, requiresAuthentication); + + GzipHandler gzipHandler = new GzipHandler(); + gzipHandler.setHandler(servletContextHandler); + handlers.add(gzipHandler); } public void addStaticResources(String basePath, String resourcePath) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 63d12cba688..e37b3bf2e9c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -25,11 +25,14 @@ import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient. import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; + import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.io.CharStreams; import com.google.common.io.Closeables; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -46,6 +49,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipException; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -139,7 +144,7 @@ public class WebServiceTest { @Test public void testDefaultClientVersion() throws Exception { setupEnv(true, "1.0", true, false, false, false, -1, false); - + try { // Make an HTTP request to lookup a namespace. The request should // succeed @@ -283,7 +288,7 @@ public class WebServiceTest { // This should have failed assertEquals(res.getStatusCode(), 405); - + builder = client.prepare("TRACK", url); res = builder.execute().get(); @@ -350,6 +355,73 @@ public class WebServiceTest { assertEquals(res.getResponseBody(), "ok"); } + @Test + public void testCompressOutputMetricsInPrometheus() throws Exception { + + setupEnv(true, "1.0", true, false, false, false, -1, false); + + String metricsUrl = pulsar.getWebServiceAddress() + "/metrics/"; + + String[] command = {"curl", "-H", "Accept-Encoding: gzip", metricsUrl}; + + ProcessBuilder processBuilder = new ProcessBuilder(command); + Process process = processBuilder.start(); + + InputStream inputStream = process.getInputStream(); + + try { + GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); + + // Process the decompressed content + StringBuilder content = new StringBuilder(); + int data; + while ((data = gzipInputStream.read()) != -1) { + content.append((char) data); + } + log.info("Response Content: {}", content); + + process.waitFor(); + assertTrue(content.toString().contains("process_cpu_seconds_total")); + } catch (IOException e) { + log.error("Failed to decompress the content, likely the content is not compressed ", e); + fail(); + } + } + + @Test + public void testUnCompressOutputMetricsInPrometheus() throws Exception { + + setupEnv(true, "1.0", true, false, false, false, -1, false); + + String metricsUrl = pulsar.getWebServiceAddress() + "/metrics/"; + + String[] command = {"curl", metricsUrl}; + + ProcessBuilder processBuilder = new ProcessBuilder(command); + Process process = processBuilder.start(); + + InputStream inputStream = process.getInputStream(); + try { + GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); + fail(); + } catch (IOException e) { + log.error("Failed to decompress the content, likely the content is not compressed ", e); + assertTrue(e instanceof ZipException); + } + + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + StringBuilder content = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + content.append(line + "\n"); + } + + log.info("Response Content: {}", content); + + process.waitFor(); + assertTrue(content.toString().contains("process_cpu_seconds_total")); + } + private String makeHttpRequest(boolean useTls, boolean useAuth) throws Exception { InputStream response = null; try { @@ -384,7 +456,7 @@ public class WebServiceTest { } private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowUnversionedClients, - boolean enableTls, boolean enableAuth, boolean allowInsecure, double rateLimit, + boolean enableTls, boolean enableAuth, boolean allowInsecure, double rateLimit, boolean disableTrace) throws Exception { if (pulsar != null) { throw new Exception("broker already started");
