This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7984cc2f93f [feat][admin] Enable Gzip Compression by Default in Admin
Client (#22464)
7984cc2f93f is described below
commit 7984cc2f93f8dc85b598ded1167508eae4ee06ec
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Apr 12 05:56:55 2024 -0700
[feat][admin] Enable Gzip Compression by Default in Admin Client (#22464)
---
.../pulsar/client/admin/PulsarAdminBuilder.java | 11 +-
pulsar-client-admin/pom.xml | 7 ++
.../admin/internal/PulsarAdminBuilderImpl.java | 22 +++-
.../client/admin/internal/PulsarAdminImpl.java | 8 +-
.../admin/internal/http/AsyncHttpConnector.java | 14 ++-
.../internal/http/AsyncHttpConnectorProvider.java | 9 +-
.../client/admin/internal/PulsarAdminGzipTest.java | 122 +++++++++++++++++++++
7 files changed, 183 insertions(+), 10 deletions(-)
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index 1260555a7c4..1b025a752d9 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -327,4 +327,13 @@ public interface PulsarAdminBuilder {
*/
PulsarAdminBuilder setContextClassLoader(ClassLoader
clientBuilderClassLoader);
-}
+ /**
+ * Determines whether to include the "Accept-Encoding: gzip" header in
HTTP requests.
+ * By default, the "Accept-Encoding: gzip" header is included in HTTP
requests.
+ * If this is set to false, the "Accept-Encoding: gzip" header will not be
included in the requests.
+ *
+ * @param acceptGzipCompression A flag that indicates whether to include
the "Accept-Encoding: gzip" header in HTTP
+ * requests
+ */
+ PulsarAdminBuilder acceptGzipCompression(boolean acceptGzipCompression);
+}
\ No newline at end of file
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index a8e4823f9f6..657be0513e5 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -113,6 +113,13 @@
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock-jre8</artifactId>
+ <version>${wiremock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 009fa67fbaa..f7b1695f5f3 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -38,10 +38,11 @@ public class PulsarAdminBuilderImpl implements
PulsarAdminBuilder {
protected ClientConfigurationData conf;
private ClassLoader clientBuilderClassLoader = null;
+ private boolean acceptGzipCompression = true;
@Override
public PulsarAdmin build() throws PulsarClientException {
- return new PulsarAdminImpl(conf.getServiceUrl(), conf,
clientBuilderClassLoader);
+ return new PulsarAdminImpl(conf.getServiceUrl(), conf,
clientBuilderClassLoader, acceptGzipCompression);
}
public PulsarAdminBuilderImpl() {
@@ -54,13 +55,24 @@ public class PulsarAdminBuilderImpl implements
PulsarAdminBuilder {
@Override
public PulsarAdminBuilder clone() {
- return new PulsarAdminBuilderImpl(conf.clone());
+ PulsarAdminBuilderImpl pulsarAdminBuilder = new
PulsarAdminBuilderImpl(conf.clone());
+ pulsarAdminBuilder.clientBuilderClassLoader = clientBuilderClassLoader;
+ pulsarAdminBuilder.acceptGzipCompression = acceptGzipCompression;
+ return pulsarAdminBuilder;
}
@Override
public PulsarAdminBuilder loadConf(Map<String, Object> config) {
conf = ConfigurationDataUtils.loadData(config, conf,
ClientConfigurationData.class);
setAuthenticationFromPropsIfAvailable(conf);
+ if (config.containsKey("acceptGzipCompression")) {
+ Object acceptGzipCompressionObj =
config.get("acceptGzipCompression");
+ if (acceptGzipCompressionObj instanceof Boolean) {
+ acceptGzipCompression = (Boolean) acceptGzipCompressionObj;
+ } else {
+ acceptGzipCompression =
Boolean.parseBoolean(acceptGzipCompressionObj.toString());
+ }
+ }
return this;
}
@@ -227,4 +239,10 @@ public class PulsarAdminBuilderImpl implements
PulsarAdminBuilder {
this.clientBuilderClassLoader = clientBuilderClassLoader;
return this;
}
+
+ @Override
+ public PulsarAdminBuilder acceptGzipCompression(boolean
acceptGzipCompression) {
+ this.acceptGzipCompression = acceptGzipCompression;
+ return this;
+ }
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index 259ca90cc08..39347850cf6 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -106,6 +106,12 @@ public class PulsarAdminImpl implements PulsarAdmin {
public PulsarAdminImpl(String serviceUrl, ClientConfigurationData
clientConfigData,
ClassLoader clientBuilderClassLoader) throws
PulsarClientException {
+ this(serviceUrl, clientConfigData, clientBuilderClassLoader, true);
+ }
+
+ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData
clientConfigData,
+ ClassLoader clientBuilderClassLoader, boolean
acceptGzipCompression)
+ throws PulsarClientException {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service URL needs
to be specified");
this.clientConfigData = clientConfigData;
@@ -119,7 +125,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
}
AsyncHttpConnectorProvider asyncConnectorProvider = new
AsyncHttpConnectorProvider(clientConfigData,
- clientConfigData.getAutoCertRefreshSeconds());
+ clientConfigData.getAutoCertRefreshSeconds(),
acceptGzipCompression);
ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 9ed2b8564f2..9ad0ce5029c 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -83,19 +83,23 @@ public class AsyncHttpConnector implements Connector {
private final PulsarServiceNameResolver serviceNameResolver;
private final ScheduledExecutorService delayer =
Executors.newScheduledThreadPool(1,
new DefaultThreadFactory("delayer"));
+ private final boolean acceptGzipCompression;
- public AsyncHttpConnector(Client client, ClientConfigurationData conf, int
autoCertRefreshTimeSeconds) {
+ public AsyncHttpConnector(Client client, ClientConfigurationData conf, int
autoCertRefreshTimeSeconds,
+ boolean acceptGzipCompression) {
this((int)
client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
(int)
client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000,
autoCertRefreshTimeSeconds,
- conf);
+ conf, acceptGzipCompression);
}
@SneakyThrows
public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs,
int requestTimeoutMs,
- int autoCertRefreshTimeSeconds,
ClientConfigurationData conf) {
+ int autoCertRefreshTimeSeconds,
ClientConfigurationData conf,
+ boolean acceptGzipCompression) {
+ this.acceptGzipCompression = acceptGzipCompression;
DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
@@ -339,6 +343,10 @@ public class AsyncHttpConnector implements Connector {
}
});
+ if (acceptGzipCompression) {
+ builder.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
+ }
+
return builder.execute().toCompletableFuture();
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
index 4467f77d1f9..d20dc848494 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -32,16 +32,19 @@ public class AsyncHttpConnectorProvider implements
ConnectorProvider {
private final ClientConfigurationData conf;
private Connector connector;
private final int autoCertRefreshTimeSeconds;
+ private final boolean acceptGzipCompression;
- public AsyncHttpConnectorProvider(ClientConfigurationData conf, int
autoCertRefreshTimeSeconds) {
+ public AsyncHttpConnectorProvider(ClientConfigurationData conf, int
autoCertRefreshTimeSeconds,
+ boolean acceptGzipCompression) {
this.conf = conf;
this.autoCertRefreshTimeSeconds = autoCertRefreshTimeSeconds;
+ this.acceptGzipCompression = acceptGzipCompression;
}
@Override
public Connector getConnector(Client client, Configuration runtimeConfig) {
if (connector == null) {
- connector = new AsyncHttpConnector(client, conf,
autoCertRefreshTimeSeconds);
+ connector = new AsyncHttpConnector(client, conf,
autoCertRefreshTimeSeconds, acceptGzipCompression);
}
return connector;
}
@@ -50,6 +53,6 @@ public class AsyncHttpConnectorProvider implements
ConnectorProvider {
public AsyncHttpConnector getConnector(int connectTimeoutMs, int
readTimeoutMs, int requestTimeoutMs,
int autoCertRefreshTimeSeconds) {
return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs,
requestTimeoutMs, autoCertRefreshTimeSeconds,
- conf);
+ conf, acceptGzipCompression);
}
}
diff --git
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminGzipTest.java
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminGzipTest.java
new file mode 100644
index 00000000000..2bfa382be10
--- /dev/null
+++
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminGzipTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.absent;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static org.testng.Assert.assertEquals;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.zip.GZIPOutputStream;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class PulsarAdminGzipTest {
+ WireMockServer server;
+
+ @BeforeClass(alwaysRun = true)
+ void beforeClass() throws IOException {
+ server = new WireMockServer(WireMockConfiguration.wireMockConfig()
+ .port(0));
+ server.start();
+ }
+
+ @AfterClass(alwaysRun = true)
+ void afterClass() {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ static byte[] gzipContent(String content) throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ try(GZIPOutputStream gzipOutputStream = new
GZIPOutputStream(byteArrayOutputStream)) {
+ gzipOutputStream.write(content.getBytes(StandardCharsets.UTF_8));
+ }
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ @AfterMethod
+ void resetAllMocks() {
+ server.resetAll();
+ }
+
+ @Test
+ public void testGzipRequestedGzipResponse() throws Exception {
+ server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+ .withHeader("Accept-Encoding", equalTo("gzip"))
+ .willReturn(aResponse()
+ .withHeader("Content-Type", "application/json")
+ .withHeader("Content-Encoding", "gzip")
+ .withBody(gzipContent("[\"gzip-test\",
\"gzip-test2\"]"))));
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl("http://localhost:" + server.port())
+ .acceptGzipCompression(true)
+ .build();
+
+ assertEquals(admin.clusters().getClusters(),
Arrays.asList("gzip-test", "gzip-test2"));
+ }
+
+ @Test
+ public void testGzipRequestedNoGzipResponse() throws Exception {
+ server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+ .withHeader("Accept-Encoding", equalTo("gzip"))
+ .willReturn(aResponse()
+ .withHeader("Content-Type", "application/json")
+ .withBody("[\"test\", \"test2\"]")));
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl("http://localhost:" + server.port())
+ .acceptGzipCompression(true)
+ .build();
+
+ assertEquals(admin.clusters().getClusters(), Arrays.asList("test",
"test2"));
+ }
+
+ @Test
+ public void testNoGzipRequestedNoGzipResponse() throws Exception {
+ server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+ .withHeader("Accept-Encoding", absent())
+ .willReturn(aResponse()
+ .withHeader("Content-Type", "application/json")
+ .withBody("[\"test\", \"test2\"]")));
+
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder()
+ .serviceHttpUrl("http://localhost:" + server.port())
+ .acceptGzipCompression(false)
+ .build();
+
+ assertEquals(admin.clusters().getClusters(), Arrays.asList("test",
"test2"));
+ }
+}