This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6d140af3fdb5edec016eca8f361d552e108d7e45 Author: Lari Hotari <[email protected]> AuthorDate: Thu Aug 8 15:04:04 2024 +0300 [improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder (#22541) (cherry picked from commit 3e7dbb4957bf5daae59127cd66e4da3802072853) (cherry picked from commit 4c480fd4ec16af1ffeac5898b60415a503f6ed02) --- distribution/server/src/assemble/LICENSE.bin.txt | 20 +- distribution/shell/src/assemble/LICENSE.bin.txt | 13 +- pom.xml | 9 +- .../pulsar/client/admin/PulsarAdminBuilder.java | 26 ++ pulsar-client-admin-shaded/pom.xml | 5 + .../client/admin/internal/FunctionsImpl.java | 70 ++-- .../pulsar/client/admin/internal/PackagesImpl.java | 68 ++-- .../admin/internal/PulsarAdminBuilderImpl.java | 24 ++ .../client/admin/internal/PulsarAdminImpl.java | 8 +- .../pulsar/client/admin/internal/SinksImpl.java | 13 +- .../pulsar/client/admin/internal/SourcesImpl.java | 13 +- .../admin/internal/http/AsyncHttpConnector.java | 351 +++++++++++++++------ .../internal/http/AsyncHttpRequestExecutor.java | 48 +++ .../admin/internal/PulsarAdminBuilderImplTest.java | 2 + .../internal/http/AsyncHttpConnectorTest.java | 200 ++++++++++++ pulsar-client-all/pom.xml | 5 + .../apache/pulsar/client/api/ClientBuilder.java | 2 + pulsar-client-shaded/pom.xml | 5 + .../apache/pulsar/client/impl/ConnectionPool.java | 2 +- .../client/impl/conf/ClientConfigurationData.java | 4 +- .../pulsar/client/impl/ClientBuilderImplTest.java | 2 +- pulsar-common/pom.xml | 5 + pulsar-sql/presto-distribution/LICENSE | 20 +- pulsar-sql/presto-distribution/pom.xml | 2 +- 24 files changed, 687 insertions(+), 230 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index b5764d7b87e..c7953eb99f7 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -554,6 +554,8 @@ The Apache Software License, Version 2.0 - com.rabbitmq-amqp-client-5.5.3.jar * RoaringBitmap - org.roaringbitmap-RoaringBitmap-0.9.44.jar + * Spotify completable-futures + - com.spotify-completable-futures-0.3.6.jar BSD 3-clause "New" or "Revised" License * Google auth library @@ -595,15 +597,15 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt - org.glassfish.hk2-osgi-resource-locator-1.0.3.jar - org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar * Jersey - - org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar - - org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar - - org.glassfish.jersey.core-jersey-client-2.41.jar - - org.glassfish.jersey.core-jersey-common-2.41.jar - - org.glassfish.jersey.core-jersey-server-2.41.jar - - org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar - - org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar - - org.glassfish.jersey.media-jersey-media-multipart-2.41.jar - - org.glassfish.jersey.inject-jersey-hk2-2.41.jar + - org.glassfish.jersey.containers-jersey-container-servlet-2.42.jar + - org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar + - org.glassfish.jersey.core-jersey-client-2.42.jar + - org.glassfish.jersey.core-jersey-common-2.42.jar + - org.glassfish.jersey.core-jersey-server-2.42.jar + - org.glassfish.jersey.ext-jersey-entity-filtering-2.42.jar + - org.glassfish.jersey.media-jersey-media-json-jackson-2.42.jar + - org.glassfish.jersey.media-jersey-media-multipart-2.42.jar + - org.glassfish.jersey.inject-jersey-hk2-2.42.jar * Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 24a57ecd2b7..9104aa9b90b 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -404,6 +404,7 @@ The Apache Software License, Version 2.0 * Apache Avro - avro-1.11.3.jar - avro-protobuf-1.11.3.jar + * Spotify completable-futures -- completable-futures-0.3.6.jar BSD 3-clause "New" or "Revised" License * JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt @@ -429,12 +430,12 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt - aopalliance-repackaged-2.6.1.jar - osgi-resource-locator-1.0.3.jar * Jersey - - jersey-client-2.41.jar - - jersey-common-2.41.jar - - jersey-entity-filtering-2.41.jar - - jersey-media-json-jackson-2.41.jar - - jersey-media-multipart-2.41.jar - - jersey-hk2-2.41.jar + - jersey-client-2.42.jar + - jersey-common-2.42.jar + - jersey-entity-filtering-2.42.jar + - jersey-media-json-jackson-2.42.jar + - jersey-media-multipart-2.42.jar + - jersey-hk2-2.42.jar * Mimepull -- mimepull-1.9.15.jar Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt diff --git a/pom.xml b/pom.xml index 0c57b6de246..6dd5459a206 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ flexible messaging model and an intuitive client API.</description> <netty.version>4.1.100.Final</netty.version> <jetty.version>9.4.54.v20240208</jetty.version> <conscrypt.version>2.5.2</conscrypt.version> - <jersey.version>2.41</jersey.version> + <jersey.version>2.42</jersey.version> <athenz.version>1.10.50</athenz.version> <prometheus.version>0.16.0</prometheus.version> <vertx.version>3.9.8</vertx.version> @@ -234,6 +234,7 @@ flexible messaging model and an intuitive client API.</description> <disruptor.version>3.4.3</disruptor.version> <zstd-jni.version>1.5.2-3</zstd-jni.version> <netty-reactive-streams.version>2.0.6</netty-reactive-streams.version> + <completable-futures.version>0.3.6</completable-futures.version> <!-- test dependencies --> <testcontainers.version>1.17.6</testcontainers.version> @@ -597,6 +598,12 @@ flexible messaging model and an intuitive client API.</description> <version>${bookkeeper.version}</version> </dependency> + <dependency> + <groupId>com.spotify</groupId> + <artifactId>completable-futures</artifactId> + <version>${completable-futures.version}</version> + </dependency> + <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java index 60d4c2dbc71..a23dee741fd 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java @@ -327,4 +327,30 @@ public interface PulsarAdminBuilder { */ PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader); + + /** + * Configures the maximum number of connections that the client library will establish with a single host. + * <p> + * By default, the connection pool maintains up to 16 connections to a single host. This method allows you to + * modify this default behavior and limit the number of connections. + * <p> + * This setting can be useful in scenarios where you want to limit the resources used by the client library, + * or control the level of parallelism for operations so that a single client does not overwhelm + * the Pulsar cluster with too many concurrent connections. + * + * @param maxConnectionsPerHost the maximum number of connections to establish per host. Set to <= 0 to disable + * the limit. + * @return the PulsarAdminBuilder instance, allowing for method chaining + */ + PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost); + + /** + * Sets the maximum idle time for a pooled connection. If a connection is idle for more than the specified + * amount of seconds, it will be released back to the connection pool. + * Defaults to 25 seconds. + * + * @param connectionMaxIdleSeconds the maximum idle time, in seconds, for a pooled connection + * @return the PulsarAdminBuilder instance + */ + PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds); } diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index f259b6076b8..a8487109886 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -123,6 +123,7 @@ <include>com.google.protobuf:protobuf-java</include> <include>com.google.guava:guava</include> <include>com.google.code.gson:gson</include> + <include>com.spotify:completable-futures</include> <include>com.fasterxml.jackson.core</include> <include>io.netty:*</include> <include>org.apache.pulsar:pulsar-common</include> @@ -188,6 +189,10 @@ <exclude>com.google.protobuf.*</exclude> </excludes> </relocation> + <relocation> + <pattern>com.spotify.futures</pattern> + <shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern> + </relocation> <relocation> <pattern>com.fasterxml.jackson</pattern> <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern> diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 750f642f365..e449c719503 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -22,7 +22,6 @@ import static org.asynchttpclient.Dsl.get; import static org.asynchttpclient.Dsl.post; import static org.asynchttpclient.Dsl.put; import com.google.gson.Gson; -import io.netty.handler.codec.http.HttpHeaders; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -41,6 +40,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.FunctionState; @@ -54,10 +54,8 @@ import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.FunctionStatsImpl; import org.apache.pulsar.common.policies.data.FunctionStatus; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.asynchttpclient.AsyncHandler; -import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncCompletionHandlerBase; import org.asynchttpclient.HttpResponseBodyPart; -import org.asynchttpclient.HttpResponseStatus; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.request.body.multipart.ByteArrayPart; import org.asynchttpclient.request.body.multipart.FilePart; @@ -70,12 +68,14 @@ import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; public class FunctionsImpl extends ComponentResource implements Functions { private final WebTarget functions; - private final AsyncHttpClient asyncHttpClient; + private final AsyncHttpRequestExecutor asyncHttpRequestExecutor; - public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) { + public FunctionsImpl(WebTarget web, Authentication auth, + AsyncHttpRequestExecutor asyncHttpRequestExecutor, + long requestTimeoutMs) { super(auth, requestTimeoutMs); this.functions = web.path("/admin/v3/functions"); - this.asyncHttpClient = asyncHttpClient; + this.asyncHttpRequestExecutor = asyncHttpRequestExecutor; } @Override @@ -277,8 +277,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { // If the function code is built in, we don't need to submit here builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); } - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()) - .toCompletableFuture() + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { future.completeExceptionally( @@ -369,8 +368,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); } - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()) - .toCompletableFuture() + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { future.completeExceptionally( @@ -570,7 +568,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { .addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM)) .addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN)); - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture() + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { future.completeExceptionally( @@ -633,55 +631,31 @@ public class FunctionsImpl extends ComponentResource implements Functions { RequestBuilder builder = get(target.getUri().toASCIIString()); - CompletableFuture<HttpResponseStatus> statusFuture = - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(), - new AsyncHandler<HttpResponseStatus>() { - private HttpResponseStatus status; - - @Override - public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception { - status = responseStatus; - if (status.getStatusCode() != Response.Status.OK.getStatusCode()) { - return State.ABORT; - } - return State.CONTINUE; - } - - @Override - public State onHeadersReceived(HttpHeaders headers) throws Exception { - return State.CONTINUE; - } + CompletableFuture<org.asynchttpclient.Response> responseFuture = + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build(), + () -> new AsyncCompletionHandlerBase() { @Override public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { os.write(bodyPart.getBodyByteBuffer()); return State.CONTINUE; } + }); - @Override - public HttpResponseStatus onCompleted() throws Exception { - return status; - } - - @Override - public void onThrowable(Throwable t) { - } - }).toCompletableFuture(); - - statusFuture - .whenComplete((status, throwable) -> { + responseFuture + .whenComplete((response, throwable) -> { try { os.close(); } catch (IOException e) { future.completeExceptionally(getApiException(e)); } }) - .thenAccept(status -> { - if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) { + .thenAccept(response -> { + if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { future.completeExceptionally( getApiException(Response - .status(status.getStatusCode()) - .entity(status.getStatusText()) + .status(response.getStatusCode()) + .entity(response.getStatusText()) .build())); } else { future.complete(null); @@ -778,7 +752,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { .path("state").path(state.getKey()).getUri().toASCIIString()); builder.addBodyPart(new StringPart("state", ObjectMapperFactory.getThreadLocal() .writeValueAsString(state), MediaType.APPLICATION_JSON)); - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()) + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) .toCompletableFuture() .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { @@ -818,7 +792,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { .addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData)) .addBodyPart(new StringPart("delete", Boolean.toString(delete))); - asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()) + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, builder).build()) .toCompletableFuture() .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java index c2b0f6b7be9..9a191c92bd2 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java @@ -20,7 +20,6 @@ package org.apache.pulsar.client.admin.internal; import static org.asynchttpclient.Dsl.get; import com.google.gson.Gson; -import io.netty.handler.codec.http.HttpHeaders; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -36,15 +35,14 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.pulsar.client.admin.Packages; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.packages.management.core.common.PackageMetadata; import org.apache.pulsar.packages.management.core.common.PackageName; -import org.asynchttpclient.AsyncHandler; -import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncCompletionHandlerBase; import org.asynchttpclient.Dsl; import org.asynchttpclient.HttpResponseBodyPart; -import org.asynchttpclient.HttpResponseStatus; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.request.body.multipart.FilePart; import org.asynchttpclient.request.body.multipart.StringPart; @@ -55,11 +53,12 @@ import org.asynchttpclient.request.body.multipart.StringPart; public class PackagesImpl extends ComponentResource implements Packages { private final WebTarget packages; - private final AsyncHttpClient httpClient; + private final AsyncHttpRequestExecutor asyncHttpRequestExecutor; - public PackagesImpl(WebTarget webTarget, Authentication auth, AsyncHttpClient client, long requestTimeoutMs) { + public PackagesImpl(WebTarget webTarget, Authentication auth, AsyncHttpRequestExecutor asyncHttpRequestExecutor, + long requestTimeoutMs) { super(auth, requestTimeoutMs); - this.httpClient = client; + this.asyncHttpRequestExecutor = asyncHttpRequestExecutor; this.packages = webTarget.path("/admin/v3/packages"); } @@ -98,7 +97,7 @@ public class PackagesImpl extends ComponentResource implements Packages { .post(packages.path(PackageName.get(packageName).toRestPath()).getUri().toASCIIString()) .addBodyPart(new FilePart("file", new File(path), MediaType.APPLICATION_OCTET_STREAM)) .addBodyPart(new StringPart("metadata", new Gson().toJson(metadata), MediaType.APPLICATION_JSON)); - httpClient.executeRequest(addAuthHeaders(packages, builder).build()) + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(packages, builder).build()) .toCompletableFuture() .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { @@ -138,55 +137,30 @@ public class PackagesImpl extends ComponentResource implements Packages { FileChannel os = new FileOutputStream(destinyPath.toFile()).getChannel(); RequestBuilder builder = get(webTarget.getUri().toASCIIString()); - CompletableFuture<HttpResponseStatus> statusFuture = - httpClient.executeRequest(addAuthHeaders(webTarget, builder).build(), - new AsyncHandler<HttpResponseStatus>() { - private HttpResponseStatus status; + CompletableFuture<org.asynchttpclient.Response> responseFuture = + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(webTarget, builder).build(), + () -> new AsyncCompletionHandlerBase() { - @Override - public State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception { - status = httpResponseStatus; - if (status.getStatusCode() != Response.Status.OK.getStatusCode()) { - return State.ABORT; + @Override + public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { + os.write(bodyPart.getBodyByteBuffer()); + return State.CONTINUE; } - return State.CONTINUE; - } - - @Override - public State onHeadersReceived(HttpHeaders httpHeaders) throws Exception { - return State.CONTINUE; - } - - @Override - public State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception { - os.write(httpResponseBodyPart.getBodyByteBuffer()); - return State.CONTINUE; - } - - @Override - public void onThrowable(Throwable throwable) { - // we don't need to handle that throwable and use the returned future to handle it. - } - - @Override - public HttpResponseStatus onCompleted() throws Exception { - return status; - } - }).toCompletableFuture(); - statusFuture - .whenComplete((status, throwable) -> { + }); + responseFuture + .whenComplete((response, throwable) -> { try { os.close(); } catch (IOException e) { future.completeExceptionally(getApiException(throwable)); } }) - .thenAccept(status -> { - if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) { + .thenAccept(response -> { + if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { future.completeExceptionally( getApiException(Response - .status(status.getStatusCode()) - .entity(status.getStatusText()) + .status(response.getStatusCode()) + .entity(response.getStatusText()) .build())); } else { future.complete(null); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index 1bcfadc8474..ec57112d762 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -46,6 +46,7 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { public PulsarAdminBuilderImpl() { this.conf = new ClientConfigurationData(); + this.conf.setConnectionsPerBroker(16); } private PulsarAdminBuilderImpl(ClientConfigurationData conf) { @@ -61,6 +62,15 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { public PulsarAdminBuilder loadConf(Map<String, Object> config) { conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class); setAuthenticationFromPropsIfAvailable(conf); + // in ClientConfigurationData, the maxConnectionsPerHost maps to connectionsPerBroker + if (config.containsKey("maxConnectionsPerHost")) { + Object maxConnectionsPerHostObj = config.get("maxConnectionsPerHost"); + if (maxConnectionsPerHostObj instanceof Integer) { + maxConnectionsPerHost((Integer) maxConnectionsPerHostObj); + } else { + maxConnectionsPerHost(Integer.parseInt(maxConnectionsPerHostObj.toString())); + } + } return this; } @@ -227,4 +237,18 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { this.clientBuilderClassLoader = clientBuilderClassLoader; return this; } + + @Override + public PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost) { + // reuse the same configuration as the client, however for the admin client, the connection + // is usually established to a cluster address and not to a broker address + this.conf.setConnectionsPerBroker(maxConnectionsPerHost); + return this; + } + + @Override + public PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds) { + this.conf.setConnectionMaxIdleSeconds(connectionMaxIdleSeconds); + return this; + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index d78394ef936..f944289d8d8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -168,13 +168,13 @@ public class PulsarAdminImpl implements PulsarAdmin { this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, requestTimeoutMs); this.resourceQuotas = new ResourceQuotasImpl(root, auth, requestTimeoutMs); this.lookups = new LookupImpl(root, auth, useTls, requestTimeoutMs, topics); - this.functions = new FunctionsImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs); - this.sources = new SourcesImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs); - this.sinks = new SinksImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs); + this.functions = new FunctionsImpl(root, auth, asyncHttpConnector, requestTimeoutMs); + this.sources = new SourcesImpl(root, auth, asyncHttpConnector, requestTimeoutMs); + this.sinks = new SinksImpl(root, auth, asyncHttpConnector, requestTimeoutMs); this.worker = new WorkerImpl(root, auth, requestTimeoutMs); this.schemas = new SchemasImpl(root, auth, requestTimeoutMs); this.bookies = new BookiesImpl(root, auth, requestTimeoutMs); - this.packages = new PackagesImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs); + this.packages = new PackagesImpl(root, auth, asyncHttpConnector, requestTimeoutMs); this.transactions = new TransactionsImpl(root, auth, requestTimeoutMs); if (originalCtxLoader != null) { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java index 79f52f8a669..59d5e1f3736 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java @@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Sink; import org.apache.pulsar.client.admin.Sinks; +import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.functions.UpdateOptionsImpl; @@ -42,7 +43,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.request.body.multipart.FilePart; import org.asynchttpclient.request.body.multipart.StringPart; @@ -53,12 +53,13 @@ import org.glassfish.jersey.media.multipart.FormDataMultiPart; public class SinksImpl extends ComponentResource implements Sinks, Sink { private final WebTarget sink; - private final AsyncHttpClient asyncHttpClient; + private final AsyncHttpRequestExecutor asyncHttpRequestExecutor; - public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) { + public SinksImpl(WebTarget web, Authentication auth, AsyncHttpRequestExecutor asyncHttpRequestExecutor, + long requestTimeoutMs) { super(auth, requestTimeoutMs); this.sink = web.path("/admin/v3/sink"); - this.asyncHttpClient = asyncHttpClient; + this.asyncHttpRequestExecutor = asyncHttpRequestExecutor; } @Override @@ -212,7 +213,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink { // If the function code is built in, we don't need to submit here builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); } - asyncHttpClient.executeRequest(addAuthHeaders(sink, builder).build()) + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(sink, builder).build()) .toCompletableFuture() .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { @@ -301,7 +302,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink { // If the function code is built in, we don't need to submit here builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); } - asyncHttpClient.executeRequest(addAuthHeaders(sink, builder).build()) + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(sink, builder).build()) .toCompletableFuture() .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java index bfd5c86ac1b..2dc27c829c8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java @@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Source; import org.apache.pulsar.client.admin.Sources; +import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.functions.UpdateOptionsImpl; @@ -41,7 +42,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SourceConfig; import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.request.body.multipart.FilePart; import org.asynchttpclient.request.body.multipart.StringPart; @@ -52,12 +52,13 @@ import org.glassfish.jersey.media.multipart.FormDataMultiPart; public class SourcesImpl extends ComponentResource implements Sources, Source { private final WebTarget source; - private final AsyncHttpClient asyncHttpClient; + private final AsyncHttpRequestExecutor asyncHttpRequestExecutor; - public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) { + public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpRequestExecutor asyncHttpRequestExecutor, + long requestTimeoutMs) { super(auth, requestTimeoutMs); this.source = web.path("/admin/v3/source"); - this.asyncHttpClient = asyncHttpClient; + this.asyncHttpRequestExecutor = asyncHttpRequestExecutor; } @Override @@ -196,7 +197,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source { // If the function code is built in, we don't need to submit here builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); } - asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build()) + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(source, builder).build()) .toCompletableFuture() .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { @@ -274,7 +275,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source { // If the function code is built in, we don't need to submit here builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); } - asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build()) + asyncHttpRequestExecutor.executeRequest(addAuthHeaders(source, builder).build()) .toCompletableFuture() .thenAccept(response -> { if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index cfaf4aa5e4d..751110798e2 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -18,6 +18,17 @@ */ package org.apache.pulsar.client.admin.internal.http; +import static org.asynchttpclient.util.HttpConstants.Methods.GET; +import static org.asynchttpclient.util.HttpConstants.Methods.HEAD; +import static org.asynchttpclient.util.HttpConstants.Methods.OPTIONS; +import static org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.FOUND_302; +import static org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.MOVED_PERMANENTLY_301; +import static org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.PERMANENT_REDIRECT_308; +import static org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.SEE_OTHER_303; +import static org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.TEMPORARY_REDIRECT_307; +import static org.asynchttpclient.util.MiscUtils.isNonEmpty; +import com.spotify.futures.ConcurrencyReducer; +import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.ssl.SslContext; @@ -27,9 +38,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.security.GeneralSecurityException; import java.time.Duration; +import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -37,32 +51,39 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import javax.net.ssl.SSLContext; +import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response.Status; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.KeyStoreParams; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; +import org.apache.pulsar.client.impl.ServiceNameResolver; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.WithSNISslEngineFactory; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext; +import org.asynchttpclient.AsyncCompletionHandlerBase; +import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClientConfig; import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; import org.asynchttpclient.Response; +import org.asynchttpclient.SslEngineFactory; import org.asynchttpclient.channel.DefaultKeepAliveStrategy; import org.asynchttpclient.netty.ssl.JsseSslEngineFactory; +import org.asynchttpclient.uri.Uri; import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.ClientRequest; import org.glassfish.jersey.client.ClientResponse; @@ -73,16 +94,18 @@ import org.glassfish.jersey.client.spi.Connector; * Customized Jersey client connector with multi-host support. */ @Slf4j -public class AsyncHttpConnector implements Connector { +public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { private static final TimeoutException REQUEST_TIMEOUT_EXCEPTION = FutureUtil.createTimeoutException("Request timeout", AsyncHttpConnector.class, "retryOrTimeout(...)"); + private static final int DEFAULT_MAX_QUEUE_SIZE_PER_HOST = 10000; @Getter private final AsyncHttpClient httpClient; private final Duration requestTimeout; private final int maxRetries; - private final PulsarServiceNameResolver serviceNameResolver; + private final ServiceNameResolver serviceNameResolver; private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("delayer")); + private final Map<String, ConcurrencyReducer<Response>> concurrencyReducers = new ConcurrentHashMap<>(); public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds) { this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT), @@ -96,9 +119,46 @@ public class AsyncHttpConnector implements Connector { public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs, int autoCertRefreshTimeSeconds, ClientConfigurationData conf) { + Validate.notEmpty(conf.getServiceUrl(), "Service URL is not provided"); + serviceNameResolver = new PulsarServiceNameResolver(); + String serviceUrl = conf.getServiceUrl(); + serviceNameResolver.updateServiceUrl(serviceUrl); + AsyncHttpClientConfig asyncHttpClientConfig = + createAsyncHttpClientConfig(conf, connectTimeoutMs, readTimeoutMs, requestTimeoutMs, + autoCertRefreshTimeSeconds); + httpClient = createAsyncHttpClient(asyncHttpClientConfig); + this.requestTimeout = requestTimeoutMs > 0 ? Duration.ofMillis(requestTimeoutMs) : null; + this.maxRetries = httpClient.getConfig().getMaxRequestRetry(); + } + + private AsyncHttpClientConfig createAsyncHttpClientConfig(ClientConfigurationData conf, int connectTimeoutMs, + int readTimeoutMs, + int requestTimeoutMs, int autoCertRefreshTimeSeconds) + throws GeneralSecurityException, IOException { DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + configureAsyncHttpClientConfig(conf, connectTimeoutMs, readTimeoutMs, requestTimeoutMs, confBuilder); + if (conf.getServiceUrl().startsWith("https://")) { + configureAsyncHttpClientSslEngineFactory(conf, autoCertRefreshTimeSeconds, confBuilder); + } + AsyncHttpClientConfig asyncHttpClientConfig = confBuilder.build(); + return asyncHttpClientConfig; + } + + private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int connectTimeoutMs, int readTimeoutMs, + int requestTimeoutMs, + DefaultAsyncHttpClientConfig.Builder confBuilder) { + if (conf.getConnectionsPerBroker() > 0) { + confBuilder.setMaxConnectionsPerHost(conf.getConnectionsPerBroker()); + // Use the request timeout value for acquireFreeChannelTimeout so that we don't need to add + // yet another configuration property. When the ConcurrencyReducer is in use, it shouldn't be necessary to + // wait for a free channel since the ConcurrencyReducer will queue the requests. + confBuilder.setAcquireFreeChannelTimeout(conf.getRequestTimeoutMs()); + } + if (conf.getConnectionMaxIdleSeconds() > 0) { + confBuilder.setPooledConnectionIdleTimeout(conf.getConnectionMaxIdleSeconds() * 1000); + } confBuilder.setUseProxyProperties(true); - confBuilder.setFollowRedirect(true); + confBuilder.setFollowRedirect(false); confBuilder.setRequestTimeout(conf.getRequestTimeoutMs()); confBuilder.setConnectTimeout(connectTimeoutMs); confBuilder.setReadTimeout(readTimeoutMs); @@ -114,75 +174,75 @@ public class AsyncHttpConnector implements Connector { && super.keepAlive(remoteAddress, ahcRequest, request, response); } }); + confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable()); + } - serviceNameResolver = new PulsarServiceNameResolver(); - if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())) { - serviceNameResolver.updateServiceUrl(conf.getServiceUrl()); - if (conf.getServiceUrl().startsWith("https://")) { - // Set client key and certificate if available - AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(); - - if (conf.isUseKeyStoreTls()) { - KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : - new KeyStoreParams(conf.getTlsKeyStoreType(), conf.getTlsKeyStorePath(), - conf.getTlsKeyStorePassword()); - - final SSLContext sslCtx = KeyStoreSSLContext.createClientSslContext( - conf.getSslProvider(), - params.getKeyStoreType(), - params.getKeyStorePath(), - params.getKeyStorePassword(), - conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustStoreType(), - conf.getTlsTrustStorePath(), - conf.getTlsTrustStorePassword(), - conf.getTlsCiphers(), - conf.getTlsProtocols()); - - JsseSslEngineFactory sslEngineFactory = new JsseSslEngineFactory(sslCtx); - confBuilder.setSslEngineFactory(sslEngineFactory); - } else { - SslProvider sslProvider = null; - if (conf.getSslProvider() != null) { - sslProvider = SslProvider.valueOf(conf.getSslProvider()); - } - SslContext sslCtx = null; - if (authData.hasDataForTls()) { - sslCtx = authData.getTlsTrustStoreStream() == null - ? SecurityUtility.createAutoRefreshSslContextForClient( - sslProvider, - conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustCertsFilePath(), authData.getTlsCerificateFilePath(), - authData.getTlsPrivateKeyFilePath(), null, autoCertRefreshTimeSeconds, delayer) - : SecurityUtility.createNettySslContextForClient( - sslProvider, - conf.isTlsAllowInsecureConnection(), - authData.getTlsTrustStoreStream(), authData.getTlsCertificates(), - authData.getTlsPrivateKey(), - conf.getTlsCiphers(), - conf.getTlsProtocols()); - } else { - sslCtx = SecurityUtility.createNettySslContextForClient( - sslProvider, - conf.isTlsAllowInsecureConnection(), - conf.getTlsTrustCertsFilePath(), - conf.getTlsCertificateFilePath(), - conf.getTlsKeyFilePath(), - conf.getTlsCiphers(), - conf.getTlsProtocols()); - } - confBuilder.setSslContext(sslCtx); - if (!conf.isTlsHostnameVerificationEnable()) { - confBuilder.setSslEngineFactory(new WithSNISslEngineFactory(serviceNameResolver - .resolveHostUri().getHost())); - } - } + protected AsyncHttpClient createAsyncHttpClient(AsyncHttpClientConfig asyncHttpClientConfig) { + return new DefaultAsyncHttpClient(asyncHttpClientConfig); + } + + private void configureAsyncHttpClientSslEngineFactory(ClientConfigurationData conf, int autoCertRefreshTimeSeconds, + DefaultAsyncHttpClientConfig.Builder confBuilder) + throws GeneralSecurityException, IOException { + // Set client key and certificate if available + AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(); + + SslEngineFactory sslEngineFactory = null; + if (conf.isUseKeyStoreTls()) { + KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : + new KeyStoreParams(conf.getTlsKeyStoreType(), conf.getTlsKeyStorePath(), + conf.getTlsKeyStorePassword()); + + final SSLContext sslCtx = KeyStoreSSLContext.createClientSslContext( + conf.getSslProvider(), + params.getKeyStoreType(), + params.getKeyStorePath(), + params.getKeyStorePassword(), + conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustStoreType(), + conf.getTlsTrustStorePath(), + conf.getTlsTrustStorePassword(), + conf.getTlsCiphers(), + conf.getTlsProtocols()); + + sslEngineFactory = new JsseSslEngineFactory(sslCtx); + confBuilder.setSslEngineFactory(sslEngineFactory); + } else { + SslProvider sslProvider = null; + if (conf.getSslProvider() != null) { + sslProvider = SslProvider.valueOf(conf.getSslProvider()); + } + SslContext sslCtx = null; + if (authData.hasDataForTls()) { + sslCtx = authData.getTlsTrustStoreStream() == null + ? SecurityUtility.createAutoRefreshSslContextForClient( + sslProvider, + conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustCertsFilePath(), authData.getTlsCerificateFilePath(), + authData.getTlsPrivateKeyFilePath(), null, autoCertRefreshTimeSeconds, delayer) + : SecurityUtility.createNettySslContextForClient( + sslProvider, + conf.isTlsAllowInsecureConnection(), + authData.getTlsTrustStoreStream(), authData.getTlsCertificates(), + authData.getTlsPrivateKey(), + conf.getTlsCiphers(), + conf.getTlsProtocols()); + } else { + sslCtx = SecurityUtility.createNettySslContextForClient( + sslProvider, + conf.isTlsAllowInsecureConnection(), + conf.getTlsTrustCertsFilePath(), + conf.getTlsCertificateFilePath(), + conf.getTlsKeyFilePath(), + conf.getTlsCiphers(), + conf.getTlsProtocols()); + } + confBuilder.setSslContext(sslCtx); + if (!conf.isTlsHostnameVerificationEnable()) { + confBuilder.setSslEngineFactory(new WithSNISslEngineFactory(serviceNameResolver + .resolveHostUri().getHost())); } - confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable()); } - httpClient = new DefaultAsyncHttpClient(confBuilder.build()); - this.requestTimeout = requestTimeoutMs > 0 ? Duration.ofMillis(requestTimeoutMs) : null; - this.maxRetries = httpClient.getConfig().getMaxRequestRetry(); } @Override @@ -202,9 +262,8 @@ public class AsyncHttpConnector implements Connector { try { return future.get(); } catch (InterruptedException | ExecutionException e) { - log.error(e.getMessage()); + throw new ProcessingException(e.getCause()); } - return null; } private URI replaceWithNew(InetSocketAddress address, URI uri) { @@ -263,6 +322,8 @@ public class AsyncHttpConnector implements Connector { return resultFuture; } + // TODO: There are problems with this solution since AsyncHttpClient already contains logic to retry requests. + // This solution doesn't contain backoff handling. private <T> void retryOperation( final CompletableFuture<T> resultFuture, final Supplier<CompletableFuture<T>> operation, @@ -274,9 +335,13 @@ public class AsyncHttpConnector implements Connector { operationFuture.whenComplete( (t, throwable) -> { if (throwable != null) { + throwable = FutureUtil.unwrapCompletionException(throwable); if (throwable instanceof CancellationException) { resultFuture.completeExceptionally( new RetryException("Operation future was cancelled.", throwable)); + } else if (throwable instanceof MaxRedirectException) { + // don't retry on max redirect + resultFuture.completeExceptionally(throwable); } else { if (retries > 0) { if (log.isDebugEnabled()) { @@ -316,7 +381,129 @@ public class AsyncHttpConnector implements Connector { } } + public static class MaxRedirectException extends Exception { + public MaxRedirectException(String msg) { + super(msg, null, true, false); + } + } + protected CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) { + Request preparedRequest; + try { + preparedRequest = prepareRequest(host, request); + } catch (IOException e) { + return FutureUtil.failedFuture(e); + } + return executeRequest(preparedRequest); + } + + public CompletableFuture<Response> executeRequest(Request request) { + return executeRequest(request, () -> new AsyncCompletionHandlerBase()); + } + + public CompletableFuture<Response> executeRequest(Request request, + Supplier<AsyncHandler<Response>> handlerSupplier) { + return executeRequest(request, handlerSupplier, 0); + } + + private CompletableFuture<Response> executeRequest(Request request, + Supplier<AsyncHandler<Response>> handlerSupplier, + int redirectCount) { + int maxRedirects = httpClient.getConfig().getMaxRedirects(); + if (redirectCount > maxRedirects) { + return FutureUtil.failedFuture( + new MaxRedirectException("Maximum redirect reached: " + maxRedirects + " uri:" + request.getUri())); + } + CompletableFuture<Response> responseFuture; + if (httpClient.getConfig().getMaxConnectionsPerHost() > 0) { + String hostAndPort = request.getUri().getHost() + ":" + request.getUri().getPort(); + ConcurrencyReducer<Response> responseConcurrencyReducer = concurrencyReducers.computeIfAbsent(hostAndPort, + h -> ConcurrencyReducer.create(httpClient.getConfig().getMaxConnectionsPerHost(), + DEFAULT_MAX_QUEUE_SIZE_PER_HOST)); + responseFuture = responseConcurrencyReducer.add(() -> doExecuteRequest(request, handlerSupplier)); + } else { + responseFuture = doExecuteRequest(request, handlerSupplier); + } + CompletableFuture<Response> futureWithRedirect = responseFuture.thenCompose(response -> { + if (isRedirectStatusCode(response.getStatusCode())) { + return executeRedirect(request, response, handlerSupplier, redirectCount); + } + return CompletableFuture.completedFuture(response); + }); + futureWithRedirect.whenComplete((response, throwable) -> { + // propagate cancellation or timeout to the original response future + responseFuture.cancel(false); + }); + return futureWithRedirect; + } + + private CompletableFuture<Response> executeRedirect(Request request, Response response, + Supplier<AsyncHandler<Response>> handlerSupplier, + int redirectCount) { + String originalMethod = request.getMethod(); + int statusCode = response.getStatusCode(); + boolean switchToGet = !originalMethod.equals(GET) + && !originalMethod.equals(OPTIONS) && !originalMethod.equals(HEAD) && ( + statusCode == MOVED_PERMANENTLY_301 || statusCode == SEE_OTHER_303 || statusCode == FOUND_302); + boolean keepBody = statusCode == TEMPORARY_REDIRECT_307 || statusCode == PERMANENT_REDIRECT_308; + String location = response.getHeader(HttpHeaders.LOCATION); + Uri newUri = Uri.create(request.getUri(), location); + BoundRequestBuilder builder = httpClient.prepareRequest(request); + if (switchToGet) { + builder.setMethod(GET); + } + builder.setUri(newUri); + if (keepBody) { + builder.setCharset(request.getCharset()); + if (isNonEmpty(request.getFormParams())) { + builder.setFormParams(request.getFormParams()); + } else if (request.getStringData() != null) { + builder.setBody(request.getStringData()); + } else if (request.getByteData() != null){ + builder.setBody(request.getByteData()); + } else if (request.getByteBufferData() != null) { + builder.setBody(request.getByteBufferData()); + } else if (request.getBodyGenerator() != null) { + builder.setBody(request.getBodyGenerator()); + } else if (isNonEmpty(request.getBodyParts())) { + builder.setBodyParts(request.getBodyParts()); + } + } else { + builder.resetFormParams(); + builder.resetNonMultipartData(); + builder.resetMultipartData(); + io.netty.handler.codec.http.HttpHeaders headers = new DefaultHttpHeaders(); + headers.add(request.getHeaders()); + headers.remove(HttpHeaders.CONTENT_LENGTH); + headers.remove(HttpHeaders.CONTENT_TYPE); + headers.remove(HttpHeaders.CONTENT_ENCODING); + builder.setHeaders(headers); + } + return executeRequest(builder.build(), handlerSupplier, redirectCount + 1); + } + + private static boolean isRedirectStatusCode(int statusCode) { + return statusCode == MOVED_PERMANENTLY_301 || statusCode == FOUND_302 || statusCode == SEE_OTHER_303 + || statusCode == TEMPORARY_REDIRECT_307 || statusCode == PERMANENT_REDIRECT_308; + } + + private CompletableFuture<Response> doExecuteRequest(Request request, + Supplier<AsyncHandler<Response>> handlerSupplier) { + ListenableFuture<Response> responseFuture = + httpClient.executeRequest(request, handlerSupplier.get()); + CompletableFuture<Response> completableFuture = responseFuture.toCompletableFuture(); + completableFuture.whenComplete((response, throwable) -> { + throwable = FutureUtil.unwrapCompletionException(throwable); + if (throwable != null && (throwable instanceof CancellationException + || throwable instanceof TimeoutException)) { + // abort the request if the future is cancelled or timed out + responseFuture.abort(throwable); + } + }); + return completableFuture; + } + + private Request prepareRequest(InetSocketAddress host, ClientRequest request) throws IOException { ClientRequest currentRequest = new ClientRequest(request); URI newUri = replaceWithNew(host, currentRequest.getUri()); currentRequest.setUri(newUri); @@ -327,14 +514,7 @@ public class AsyncHttpConnector implements Connector { if (currentRequest.hasEntity()) { ByteArrayOutputStream outStream = new ByteArrayOutputStream(); currentRequest.setStreamProvider(contentLength -> outStream); - try { - currentRequest.writeEntity(); - } catch (IOException e) { - CompletableFuture<Response> r = new CompletableFuture<>(); - r.completeExceptionally(e); - return r; - } - + currentRequest.writeEntity(); builder.setBody(outStream.toByteArray()); } @@ -344,16 +524,7 @@ public class AsyncHttpConnector implements Connector { } }); - ListenableFuture<Response> responseFuture = builder.execute(); - CompletableFuture<Response> completableFuture = responseFuture.toCompletableFuture(); - completableFuture.whenComplete((response, throwable) -> { - if (throwable != null && (throwable instanceof CancellationException - || throwable instanceof TimeoutException)) { - // abort the request if the future is cancelled or timed out - responseFuture.abort(throwable); - } - }); - return completableFuture; + return builder.build(); } @Override diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java new file mode 100644 index 00000000000..25810b8b1cf --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin.internal.http; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.Request; +import org.asynchttpclient.Response; + +/** + * Interface for executing HTTP requests asynchronously. + * This is used internally in the Pulsar Admin client for executing HTTP requests that by-pass the Jersey client + * and use the AsyncHttpClient API directly. + */ +public interface AsyncHttpRequestExecutor { + /** + * Execute the given HTTP request asynchronously. + * + * @param request the HTTP request to execute + * @return a future that will be completed with the HTTP response + */ + CompletableFuture<Response> executeRequest(Request request); + /** + * Execute the given HTTP request asynchronously. + * + * @param request the HTTP request to execute + * @param handlerSupplier a supplier for the async handler to use for the request + * @return a future that will be completed with the HTTP response + */ + CompletableFuture<Response> executeRequest(Request request, Supplier<AsyncHandler<Response>> handlerSupplier); +} diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java index 9aac263d54e..7042f46694c 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java @@ -65,6 +65,7 @@ public class PulsarAdminBuilderImplTest { config.put("autoCertRefreshSeconds", 20); config.put("connectionTimeoutMs", 30); config.put("readTimeoutMs", 40); + config.put("maxConnectionsPerHost", 50); PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().loadConf(config); PulsarAdminImpl admin = (PulsarAdminImpl) adminBuilder.build(); ClientConfigurationData clientConfigData = admin.getClientConfigData(); @@ -72,6 +73,7 @@ public class PulsarAdminBuilderImplTest { Assert.assertEquals(clientConfigData.getAutoCertRefreshSeconds(), 20); Assert.assertEquals(clientConfigData.getConnectionTimeoutMs(), 30); Assert.assertEquals(clientConfigData.getReadTimeoutMs(), 40); + Assert.assertEquals(clientConfigData.getConnectionsPerBroker(), 50); } @Test diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java index aee3ad48cde..e2676a996bb 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java @@ -20,23 +20,34 @@ package org.apache.pulsar.client.admin.internal.http; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.common.FileSource; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.extension.Parameters; +import com.github.tomakehurst.wiremock.extension.ResponseTransformer; import com.github.tomakehurst.wiremock.stubbing.Scenario; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.util.FutureUtil; +import org.asynchttpclient.Request; +import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.Response; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientRequest; @@ -52,10 +63,74 @@ import org.testng.annotations.Test; public class AsyncHttpConnectorTest { WireMockServer server; + ConcurrencyTestTransformer concurrencyTestTransformer = new ConcurrencyTestTransformer(); + + private static class CopyRequestBodyToResponseBodyTransformer extends ResponseTransformer { + @Override + public com.github.tomakehurst.wiremock.http.Response transform( + com.github.tomakehurst.wiremock.http.Request request, + com.github.tomakehurst.wiremock.http.Response response, FileSource fileSource, Parameters parameters) { + return com.github.tomakehurst.wiremock.http.Response.Builder.like(response) + .body(request.getBodyAsString()) + .build(); + } + + @Override + public String getName() { + return "copy-body"; + } + + @Override + public boolean applyGlobally() { + return false; + } + } + + private static class ConcurrencyTestTransformer extends ResponseTransformer { + private static final long DELAY_MS = 100; + private final AtomicInteger concurrencyCounter = new AtomicInteger(0); + private final AtomicInteger maxConcurrency = new AtomicInteger(0); + + @Override + public com.github.tomakehurst.wiremock.http.Response transform( + com.github.tomakehurst.wiremock.http.Request request, + com.github.tomakehurst.wiremock.http.Response response, FileSource fileSource, Parameters parameters) { + int currentCounter = concurrencyCounter.incrementAndGet(); + maxConcurrency.updateAndGet(v -> Math.max(v, currentCounter)); + try { + try { + Thread.sleep(DELAY_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return com.github.tomakehurst.wiremock.http.Response.Builder.like(response) + .body(String.valueOf(currentCounter)) + .build(); + } finally { + concurrencyCounter.decrementAndGet(); + } + } + + public int getMaxConcurrency() { + return maxConcurrency.get(); + } + + @Override + public String getName() { + return "concurrency-test"; + } + + @Override + public boolean applyGlobally() { + return false; + } + } @BeforeClass(alwaysRun = true) void beforeClass() throws IOException { server = new WireMockServer(WireMockConfiguration.wireMockConfig() + .extensions(new CopyRequestBodyToResponseBodyTransformer(), concurrencyTestTransformer) + .containerThreads(100) .port(0)); server.start(); } @@ -137,4 +212,129 @@ public class AsyncHttpConnectorTest { assertEquals(scenarioState, "next"); assertTrue(future.isCompletedExceptionally()); } + + @Test + void testMaxRedirects() { + // Redirect to itself to test max redirects + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .willReturn(aResponse() + .withStatus(301) + .withHeader("Location", "http://localhost:" + server.port() + "/admin/v2/clusters"))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + + @Cleanup + AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, + 5000, 0, conf); + + Request request = new RequestBuilder("GET") + .setUrl("http://localhost:" + server.port() + "/admin/v2/clusters") + .build(); + + try { + connector.executeRequest(request).get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof AsyncHttpConnector.MaxRedirectException); + } catch (InterruptedException e) { + fail(); + } + } + + @Test + void testRelativeRedirect() throws ExecutionException, InterruptedException { + doTestRedirect("path2"); + } + + @Test + void testAbsoluteRedirect() throws ExecutionException, InterruptedException { + doTestRedirect("/path2"); + } + + @Test + void testUrlRedirect() throws ExecutionException, InterruptedException { + doTestRedirect("http://localhost:" + server.port() + "/path2"); + } + + private void doTestRedirect(String location) throws InterruptedException, ExecutionException { + server.stubFor(get(urlEqualTo("/path1")) + .willReturn(aResponse() + .withStatus(301) + .withHeader("Location", location))); + + server.stubFor(get(urlEqualTo("/path2")) + .willReturn(aResponse() + .withBody("OK"))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + + @Cleanup + AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, + 5000, 0, conf); + + Request request = new RequestBuilder("GET") + .setUrl("http://localhost:" + server.port() + "/path1") + .build(); + + Response response = connector.executeRequest(request).get(); + assertEquals(response.getResponseBody(), "OK"); + } + + @Test + void testRedirectWithBody() throws ExecutionException, InterruptedException { + server.stubFor(post(urlEqualTo("/path1")) + .willReturn(aResponse() + .withStatus(307) + .withHeader("Location", "/path2"))); + + server.stubFor(post(urlEqualTo("/path2")) + .willReturn(aResponse() + .withTransformers("copy-body"))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + + @Cleanup + AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, + 5000, 0, conf); + + Request request = new RequestBuilder("POST") + .setUrl("http://localhost:" + server.port() + "/path1") + .setBody("Hello world!") + .build(); + + Response response = connector.executeRequest(request).get(); + assertEquals(response.getResponseBody(), "Hello world!"); + } + + @Test + void testMaxConnections() throws ExecutionException, InterruptedException { + server.stubFor(post(urlEqualTo("/concurrency-test")) + .willReturn(aResponse() + .withTransformers("concurrency-test"))); + + ClientConfigurationData conf = new ClientConfigurationData(); + int maxConnections = 10; + conf.setConnectionsPerBroker(maxConnections); + conf.setServiceUrl("http://localhost:" + server.port()); + + @Cleanup + AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, + 5000, 0, conf); + + Request request = new RequestBuilder("POST") + .setUrl("http://localhost:" + server.port() + "/concurrency-test") + .build(); + + List<CompletableFuture<Response>> futures = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + futures.add(connector.executeRequest(request)); + } + FutureUtil.waitForAll(futures).get(); + int maxConcurrency = concurrencyTestTransformer.getMaxConcurrency(); + assertTrue(maxConcurrency > maxConnections / 2 && maxConcurrency <= maxConnections, + "concurrency didn't get limited as expected (max: " + maxConcurrency + ")"); + } } \ No newline at end of file diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml index 330b28e38b8..666754e7bb0 100644 --- a/pulsar-client-all/pom.xml +++ b/pulsar-client-all/pom.xml @@ -165,6 +165,7 @@ <include>com.google.errorprone:*</include> <include>com.google.j2objc:*</include> <include>com.google.code.gson:gson</include> + <include>com.spotify:completable-futures</include> <include>com.fasterxml.jackson.core</include> <include>com.fasterxml.jackson.module</include> <include>com.fasterxml.jackson.core:jackson-core</include> @@ -240,6 +241,10 @@ <exclude>com.google.protobuf.*</exclude> </excludes> </relocation> + <relocation> + <pattern>com.spotify.futures</pattern> + <shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern> + </relocation> <relocation> <pattern>com.fasterxml.jackson</pattern> <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern> diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 6d44f20b8ee..4635958eb5a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -128,6 +128,8 @@ public interface ClientBuilder extends Serializable, Cloneable { /** * Release the connection if it is not used for more than {@param connectionMaxIdleSeconds} seconds. + * Defaults to 25 seconds. + * * @return the client builder instance */ ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds); diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index 3405f705797..f69713b692f 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -145,6 +145,7 @@ <include>com.google.errorprone:*</include> <include>com.google.j2objc:*</include> <include>com.google.code.gson:gson</include> + <include>com.spotify:completable-futures</include> <include>com.fasterxml.jackson.core</include> <include>com.fasterxml.jackson.module</include> <include>com.fasterxml.jackson.core:jackson-core</include> @@ -203,6 +204,10 @@ <exclude>com.google.protobuf.*</exclude> </excludes> </relocation> + <relocation> + <pattern>com.spotify.futures</pattern> + <shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern> + </relocation> <relocation> <pattern>com.fasterxml.jackson</pattern> <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 0b4b7e9a3b9..f675e684d67 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory; public class ConnectionPool implements AutoCloseable { - public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60; + public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 15; protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 9c2b4b8d58a..3c5d889e0c7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl.conf; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import io.swagger.annotations.ApiModelProperty; import java.io.Serializable; import java.net.InetSocketAddress; @@ -45,6 +46,7 @@ import org.apache.pulsar.client.util.Secret; @Data @NoArgsConstructor @AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) public class ClientConfigurationData implements Serializable, Cloneable { private static final long serialVersionUID = 1L; @@ -129,7 +131,7 @@ public class ClientConfigurationData implements Serializable, Cloneable { value = "Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. " + "If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections" ) - private int connectionMaxIdleSeconds = 180; + private int connectionMaxIdleSeconds = 25; @ApiModelProperty( name = "useTcpNoDelay", diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java index ca2efe1f0ad..c71a6d45c2a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java @@ -109,7 +109,7 @@ public class ClientBuilderImplTest { PulsarClient.builder().connectionMaxIdleSeconds(60); // test config not correct. try { - PulsarClient.builder().connectionMaxIdleSeconds(30); + PulsarClient.builder().connectionMaxIdleSeconds(14); Assert.fail(); } catch (IllegalArgumentException e){ } diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 1428463d844..a0f43a81aec 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -163,6 +163,11 @@ <optional>true</optional> </dependency> + <dependency> + <groupId>com.spotify</groupId> + <artifactId>completable-futures</artifactId> + </dependency> + <!-- test --> <dependency> <groupId>org.bouncycastle</groupId> diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index 405ca53f551..71688402a20 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -491,6 +491,8 @@ The Apache Software License, Version 2.0 - auto-service-annotations-1.0.jar * AMQP - amqp-client-5.5.3.jar + * Spotify completable-futures + - completable-futures-0.3.6.jar Protocol Buffers License * Protocol Buffers @@ -544,15 +546,15 @@ CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt - aopalliance-repackaged-2.6.1.jar * Jersey - jaxrs-0.199.jar - - jersey-client-2.41.jar - - jersey-common-2.41.jar - - jersey-container-servlet-2.41.jar - - jersey-container-servlet-core-2.41.jar - - jersey-entity-filtering-2.41.jar - - jersey-hk2-2.41.jar - - jersey-media-json-jackson-2.41.jar - - jersey-media-multipart-2.41.jar - - jersey-server-2.41.jar + - jersey-client-2.42.jar + - jersey-common-2.42.jar + - jersey-container-servlet-2.42.jar + - jersey-container-servlet-core-2.42.jar + - jersey-entity-filtering-2.42.jar + - jersey-hk2-2.42.jar + - jersey-media-json-jackson-2.42.jar + - jersey-media-multipart-2.42.jar + - jersey-server-2.42.jar * JAXB - jaxb-api-2.3.1.jar * RXJava diff --git a/pulsar-sql/presto-distribution/pom.xml b/pulsar-sql/presto-distribution/pom.xml index 6f63c9a5585..e0915145894 100644 --- a/pulsar-sql/presto-distribution/pom.xml +++ b/pulsar-sql/presto-distribution/pom.xml @@ -33,7 +33,7 @@ <properties> <skipBuildDistribution>false</skipBuildDistribution> - <jersey.version>2.41</jersey.version> + <jersey.version>2.42</jersey.version> <objenesis.version>2.6</objenesis.version> <objectsize.version>0.0.12</objectsize.version> <maven.version>3.0.5</maven.version>
