This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e626c0232adef93fdbe43e44fff0eba560b22538
Author: Ruimin MA <[email protected]>
AuthorDate: Tue Nov 4 23:42:33 2025 +0800

    [feat][client] PIP-234: Support shared resources in PulsarAdmin to reduce 
thread usage (#24893)
    
    (cherry picked from commit d186946f6f35f478d7e8e49ca5ac4986cc5568d2)
---
 .../pulsar/client/admin/PulsarAdminBuilder.java    |  14 +++
 .../admin/internal/PulsarAdminBuilderImpl.java     |  12 ++-
 .../client/admin/internal/PulsarAdminImpl.java     |  10 +-
 .../admin/internal/http/AsyncHttpConnector.java    | 104 ++++++++++++++++++---
 .../internal/http/AsyncHttpConnectorProvider.java  |   5 +-
 .../admin/internal/PulsarAdminBuilderImplTest.java |  96 +++++++++++++++++++
 .../internal/http/AsyncHttpConnectorTest.java      |  10 +-
 7 files changed, 228 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index 216376c7eb6..4add07d588a 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
 
 /**
  * Builder class for a {@link PulsarAdmin} instance.
@@ -393,4 +394,17 @@ public interface PulsarAdminBuilder {
      * @throws IllegalArgumentException if the length of description exceeds 64
      */
     PulsarAdminBuilder description(String description);
+
+    /**
+     * Provide a set of shared client resources to be reused by this client.
+     * <p>
+     * Providing a shared resource instance allows PulsarClient instances to 
share resources
+     * (only support IO/event loops, timers, DNS resolver/cache) with other 
PulsarClient
+     * instances, reducing memory footprint and thread usage when creating 
many clients in the same JVM.
+     *
+     * @param sharedResources the shared resources instance created with 
{@link PulsarClientSharedResources#builder()}
+     * @return the adminClient builder instance
+     */
+    PulsarAdminBuilder sharedResources(PulsarClientSharedResources 
sharedResources);
+
 }
\ No newline at end of file
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index a046f13f60e..f94f6890736 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -29,6 +29,8 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
@@ -39,10 +41,12 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
 
     private ClassLoader clientBuilderClassLoader = null;
     private boolean acceptGzipCompression = true;
+    private transient PulsarClientSharedResourcesImpl sharedResources;
 
     @Override
     public PulsarAdmin build() throws PulsarClientException {
-        return new PulsarAdminImpl(conf.getServiceUrl(), conf, 
clientBuilderClassLoader, acceptGzipCompression);
+        return new PulsarAdminImpl(conf.getServiceUrl(), conf,
+                clientBuilderClassLoader, acceptGzipCompression, 
sharedResources);
     }
 
     public PulsarAdminBuilderImpl() {
@@ -292,4 +296,10 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
         this.conf.setDescription(description);
         return this;
     }
+
+    @Override
+    public PulsarAdminBuilder sharedResources(PulsarClientSharedResources 
sharedResources) {
+        this.sharedResources = (PulsarClientSharedResourcesImpl) 
sharedResources;
+        return this;
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index aaea8a89f8d..e4ca7724ca1 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.WebTarget;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.Bookies;
 import org.apache.pulsar.client.admin.BrokerStats;
@@ -56,6 +57,7 @@ import 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.net.ServiceURI;
@@ -91,6 +93,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
     private final ResourceQuotas resourceQuotas;
     private final ClientConfigurationData clientConfigData;
     private final Client client;
+    @Getter
     private final AsyncHttpConnector asyncHttpConnector;
     private final String serviceUrl;
     private final Lookup lookups;
@@ -106,11 +109,12 @@ public class PulsarAdminImpl implements PulsarAdmin {
 
     public PulsarAdminImpl(String serviceUrl, ClientConfigurationData 
clientConfigData,
                            ClassLoader clientBuilderClassLoader) throws 
PulsarClientException {
-        this(serviceUrl, clientConfigData, clientBuilderClassLoader, true);
+        this(serviceUrl, clientConfigData, clientBuilderClassLoader, true, 
null);
     }
 
     public PulsarAdminImpl(String serviceUrl, ClientConfigurationData 
clientConfigData,
-                           ClassLoader clientBuilderClassLoader, boolean 
acceptGzipCompression)
+                           ClassLoader clientBuilderClassLoader, boolean 
acceptGzipCompression,
+                           PulsarClientSharedResourcesImpl sharedResources)
             throws PulsarClientException {
         checkArgument(StringUtils.isNotBlank(serviceUrl), "Service URL needs 
to be specified");
 
@@ -157,7 +161,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
                 Math.toIntExact(clientConfigData.getConnectionTimeoutMs()),
                 Math.toIntExact(clientConfigData.getReadTimeoutMs()),
                 Math.toIntExact(clientConfigData.getRequestTimeoutMs()),
-                clientConfigData.getAutoCertRefreshSeconds());
+                clientConfigData.getAutoCertRefreshSeconds(), sharedResources);
 
         long requestTimeoutMs = clientConfigData.getRequestTimeoutMs();
         this.clusters = new ClustersImpl(root, auth, requestTimeoutMs);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index b9ef758e3c6..ed17df8bd73 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -28,12 +28,15 @@ import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.SEE_OTH
 import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.TEMPORARY_REDIRECT_307;
 import static org.asynchttpclient.util.MiscUtils.isNonEmpty;
 import com.spotify.futures.ConcurrencyReducer;
+import io.netty.channel.EventLoopGroup;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
+import io.netty.resolver.NameResolver;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.GeneralSecurityException;
@@ -53,6 +56,7 @@ import javax.ws.rs.ProcessingException;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response.Status;
+import lombok.Data;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -60,13 +64,17 @@ import org.apache.commons.lang3.Validate;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.ServiceNameResolver;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.PulsarHttpAsyncSslEngineFactory;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.PulsarSslConfiguration;
 import org.apache.pulsar.common.util.PulsarSslFactory;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.asynchttpclient.AsyncCompletionHandlerBase;
 import org.asynchttpclient.AsyncHandler;
 import org.asynchttpclient.AsyncHttpClient;
@@ -103,6 +111,10 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
             new DefaultThreadFactory("delayer"));
     private ScheduledExecutorService sslRefresher;
     private final boolean acceptGzipCompression;
+    @Getter
+    private final NameResolver<InetAddress> nameResolver;
+    private final EventLoopGroup eventLoopGroup;
+    private final boolean createdEventLoopGroup;
     private final Map<String, ConcurrencyReducer<Response>> 
concurrencyReducers = new ConcurrentHashMap<>();
     private PulsarSslFactory sslFactory;
 
@@ -112,33 +124,66 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
                 (int) 
client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
                 PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000,
                 autoCertRefreshTimeSeconds,
-                conf, acceptGzipCompression);
+                conf, acceptGzipCompression, null);
     }
 
     @SneakyThrows
     public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs,
                               int requestTimeoutMs,
                               int autoCertRefreshTimeSeconds, 
ClientConfigurationData conf,
-                              boolean acceptGzipCompression) {
+                              boolean acceptGzipCompression,
+                              PulsarClientSharedResourcesImpl sharedResources) 
{
         Validate.notEmpty(conf.getServiceUrl(), "Service URL is not provided");
         serviceNameResolver = new PulsarServiceNameResolver();
         String serviceUrl = conf.getServiceUrl();
         serviceNameResolver.updateServiceUrl(serviceUrl);
         this.acceptGzipCompression = acceptGzipCompression;
+        SharedResourceHolder sharedResourceHolder =
+                buildResourcesIfConfigured(sharedResources);
+        this.nameResolver = sharedResourceHolder.getNameResolver();
+        this.eventLoopGroup = sharedResourceHolder.getEventLoopGroup();
+        this.createdEventLoopGroup = sharedResourceHolder.isCreateEventLoop();
         AsyncHttpClientConfig asyncHttpClientConfig =
                 createAsyncHttpClientConfig(conf, connectTimeoutMs, 
readTimeoutMs, requestTimeoutMs,
-                        autoCertRefreshTimeSeconds);
+                        autoCertRefreshTimeSeconds, sharedResources);
         httpClient = createAsyncHttpClient(asyncHttpClientConfig);
         this.requestTimeout = requestTimeoutMs > 0 ? 
Duration.ofMillis(requestTimeoutMs) : null;
         this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
     }
 
+    private SharedResourceHolder buildResourcesIfConfigured(
+            PulsarClientSharedResourcesImpl sharedResources) {
+        EventLoopGroup eventLoopGroup = null;
+        NameResolver<InetAddress> nameResolver = null;
+        boolean createdEventLoopGroup = false;
+        if (sharedResources != null && sharedResources.getDnsResolverGroup() 
!= null) {
+            if (sharedResources.getIoEventLoopGroup() != null) {
+                eventLoopGroup = sharedResources.getIoEventLoopGroup();
+            } else {
+                // build an EventLoopGroup with default value
+                eventLoopGroup = EventLoopUtil.newEventLoopGroup(
+                        Runtime.getRuntime().availableProcessors(), false,
+                        new 
ExecutorProvider.ExtendedThreadFactory("pulsar-admin-client-io",
+                                Thread.currentThread().isDaemon()));
+                createdEventLoopGroup = true;
+            }
+            nameResolver = DnsResolverUtil.adaptToNameResolver(
+                    
sharedResources.getDnsResolverGroup().createAddressResolver(eventLoopGroup));
+        } else {
+            return SharedResourceHolder.EMPTY;
+        }
+        return new SharedResourceHolder(nameResolver, eventLoopGroup, 
createdEventLoopGroup);
+    }
+
     private AsyncHttpClientConfig 
createAsyncHttpClientConfig(ClientConfigurationData conf, int connectTimeoutMs,
                                                               int 
readTimeoutMs,
-                                                              int 
requestTimeoutMs, int autoCertRefreshTimeSeconds)
+                                                              int 
requestTimeoutMs,
+                                                              int 
autoCertRefreshTimeSeconds,
+                                                              
PulsarClientSharedResourcesImpl sharedResources)
             throws GeneralSecurityException, IOException {
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
-        configureAsyncHttpClientConfig(conf, connectTimeoutMs, readTimeoutMs, 
requestTimeoutMs, confBuilder);
+        configureAsyncHttpClientConfig(conf, connectTimeoutMs,
+                readTimeoutMs, requestTimeoutMs, confBuilder, sharedResources);
         if (conf.getServiceUrl().startsWith("https://";)) {
             configureAsyncHttpClientSslEngineFactory(conf, 
autoCertRefreshTimeSeconds, confBuilder);
         }
@@ -148,7 +193,8 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
 
     private void configureAsyncHttpClientConfig(ClientConfigurationData conf, 
int connectTimeoutMs, int readTimeoutMs,
                                                 int requestTimeoutMs,
-                                                
DefaultAsyncHttpClientConfig.Builder confBuilder) {
+                                                
DefaultAsyncHttpClientConfig.Builder confBuilder,
+                                                
PulsarClientSharedResourcesImpl sharedResources) {
         if (conf.getConnectionsPerBroker() > 0) {
             
confBuilder.setMaxConnectionsPerHost(conf.getConnectionsPerBroker());
             // Use the request timeout value for acquireFreeChannelTimeout so 
that we don't need to add
@@ -159,6 +205,14 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
         if (conf.getConnectionMaxIdleSeconds() > 0) {
             
confBuilder.setPooledConnectionIdleTimeout(conf.getConnectionMaxIdleSeconds() * 
1000);
         }
+        if (sharedResources != null) {
+            if (this.eventLoopGroup != null) {
+                confBuilder.setEventLoopGroup(this.eventLoopGroup);
+            }
+            if (sharedResources.getTimer() != null) {
+                confBuilder.setNettyTimer(sharedResources.getTimer());
+            }
+        }
         confBuilder.setCookieStore(null);
         confBuilder.setUseProxyProperties(true);
         confBuilder.setFollowRedirect(false);
@@ -177,7 +231,7 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
                                      HttpRequest request, HttpResponse 
response) {
                 // Close connection upon a server error or per HTTP spec
                 return (response.status().code() / 100 != 5)
-                       && super.keepAlive(remoteAddress, ahcRequest, request, 
response);
+                        && super.keepAlive(remoteAddress, ahcRequest, request, 
response);
             }
         });
         
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
@@ -331,9 +385,9 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
                                                 throwable);
                                     }
                                     resultFuture.completeExceptionally(
-                                        new RetryException("Could not complete 
the operation. Number of retries "
-                                            + "has been exhausted. Failed 
reason: " + throwable.getMessage(),
-                                            throwable));
+                                            new RetryException("Could not 
complete the operation. Number of retries "
+                                                    + "has been exhausted. 
Failed reason: " + throwable.getMessage(),
+                                                    throwable));
                                 }
                             }
                         } else {
@@ -376,7 +430,7 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
     }
 
     public CompletableFuture<Response> executeRequest(Request request,
-                                                       
Supplier<AsyncHandler<Response>> handlerSupplier) {
+                                                      
Supplier<AsyncHandler<Response>> handlerSupplier) {
         return executeRequest(request, handlerSupplier, 0);
     }
 
@@ -426,6 +480,9 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
         if (switchToGet) {
             builder.setMethod(GET);
         }
+        if (this.nameResolver != null) {
+            builder.setNameResolver(this.nameResolver);
+        }
         builder.setUri(newUri);
         if (keepBody) {
             builder.setCharset(request.getCharset());
@@ -433,7 +490,7 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
                 builder.setFormParams(request.getFormParams());
             } else if (request.getStringData() != null) {
                 builder.setBody(request.getStringData());
-            } else if (request.getByteData() != null){
+            } else if (request.getByteData() != null) {
                 builder.setBody(request.getByteData());
             } else if (request.getByteBufferData() != null) {
                 builder.setBody(request.getByteBufferData());
@@ -485,6 +542,9 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
         BoundRequestBuilder builder =
                 httpClient.prepare(currentRequest.getMethod(), 
currentRequest.getUri().toString());
 
+        if (this.nameResolver != null) {
+            builder.setNameResolver(this.nameResolver);
+        }
         if (currentRequest.hasEntity()) {
             ByteArrayOutputStream outStream = new ByteArrayOutputStream();
             currentRequest.setStreamProvider(contentLength -> outStream);
@@ -518,6 +578,9 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
             if (sslRefresher != null) {
                 sslRefresher.shutdownNow();
             }
+            if (createdEventLoopGroup && eventLoopGroup != null && 
!eventLoopGroup.isShutdown()) {
+                eventLoopGroup.shutdownGracefully();
+            }
         } catch (IOException e) {
             log.warn("Failed to close http client", e);
         }
@@ -556,4 +619,21 @@ public class AsyncHttpConnector implements Connector, 
AsyncHttpRequestExecutor {
         }
     }
 
+    @Data
+    private static class SharedResourceHolder {
+        static final SharedResourceHolder EMPTY = new 
SharedResourceHolder(null, null, false);
+
+        final NameResolver<InetAddress> nameResolver;
+        final EventLoopGroup eventLoopGroup;
+        final boolean createEventLoop;
+
+        SharedResourceHolder(NameResolver<InetAddress> nameResolver,
+                             EventLoopGroup eventLoopGroup,
+                             boolean createEventLoop) {
+            this.nameResolver = nameResolver;
+            this.eventLoopGroup = eventLoopGroup;
+            this.createEventLoop = createEventLoop;
+        }
+    }
+
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
index d20dc848494..caaa356f7c7 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin.internal.http;
 
 import javax.ws.rs.client.Client;
 import javax.ws.rs.core.Configuration;
+import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.glassfish.jersey.client.spi.Connector;
 import org.glassfish.jersey.client.spi.ConnectorProvider;
@@ -51,8 +52,8 @@ public class AsyncHttpConnectorProvider implements 
ConnectorProvider {
 
 
     public AsyncHttpConnector getConnector(int connectTimeoutMs, int 
readTimeoutMs, int requestTimeoutMs,
-            int autoCertRefreshTimeSeconds) {
+            int autoCertRefreshTimeSeconds, PulsarClientSharedResourcesImpl 
sharedResources) {
         return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, 
requestTimeoutMs, autoCertRefreshTimeSeconds,
-                conf, acceptGzipCompression);
+                conf, acceptGzipCompression, sharedResources);
     }
 }
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
index 9339c893492..987b054426b 100644
--- 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
@@ -23,10 +23,16 @@ import static 
org.assertj.core.api.Assertions.assertThatIllegalArgumentException
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
+import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.NameResolver;
+import io.netty.util.Timer;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -35,6 +41,7 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.testng.Assert;
@@ -188,6 +195,95 @@ public class PulsarAdminBuilderImplTest {
                 
PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080";).description("forked").build();
     }
 
+    @Test
+    public void testClientBuildWithSharedResources() throws 
PulsarClientException {
+        PulsarClientSharedResources sharedResources = 
PulsarClientSharedResources.builder()
+                .configureEventLoop(eventLoopGroupConfig -> {
+                    eventLoopGroupConfig
+                            .name("testEventLoop")
+                            .numberOfThreads(20);
+                })
+                .configureDnsResolver(dnsResolverConfig -> {
+                    dnsResolverConfig.localAddress(new InetSocketAddress(0));
+                })
+                .configureTimer(timerConfig -> {
+                    timerConfig.name("testTimer").tickDuration(100, 
TimeUnit.MILLISECONDS);
+                })
+                .build();
+        // create two adminClients and check if they share the same event loop 
group and netty timer
+        @Cleanup
+        PulsarAdminImpl pulsarAdminImpl1 =
+                (PulsarAdminImpl) PulsarAdmin.builder()
+                        .serviceHttpUrl("http://localhost:8080";)
+                        .sharedResources(sharedResources)
+                        .build();
+        @Cleanup
+        PulsarAdminImpl pulsarAdminImpl2 =
+                (PulsarAdminImpl) PulsarAdmin.builder()
+                        .serviceHttpUrl("http://localhost:8080";)
+                        .sharedResources(sharedResources)
+                        .build();
+
+        EventLoopGroup eventLoopGroup1 =
+                
pulsarAdminImpl1.getAsyncHttpConnector().getHttpClient().getConfig().getEventLoopGroup();
+        EventLoopGroup eventLoopGroup2 =
+                
pulsarAdminImpl2.getAsyncHttpConnector().getHttpClient().getConfig().getEventLoopGroup();
+        Timer nettyTimer1 = 
pulsarAdminImpl1.getAsyncHttpConnector().getHttpClient().getConfig().getNettyTimer();
+        Timer nettyTimer2 = 
pulsarAdminImpl2.getAsyncHttpConnector().getHttpClient().getConfig().getNettyTimer();
+        assertThat(eventLoopGroup1).isSameAs(eventLoopGroup2);
+        assertThat(nettyTimer1).isSameAs(nettyTimer2);
+        sharedResources.close();
+    }
+
+    @Test
+    public void testClientBuildWithSharedDnsResolverOnly() throws 
PulsarClientException {
+        PulsarClientSharedResources sharedResources = 
PulsarClientSharedResources.builder()
+                .shareConfigured()
+                .configureDnsResolver(dnsResolverConfig -> {
+                    dnsResolverConfig.localAddress(new InetSocketAddress(0));
+                })
+                .build();
+
+        @Cleanup
+        PulsarAdminImpl pulsarAdminImpl1 =
+                (PulsarAdminImpl) PulsarAdmin.builder()
+                        .serviceHttpUrl("http://localhost:8080";)
+                        .sharedResources(sharedResources)
+                        .build();
+        @Cleanup
+        PulsarAdminImpl pulsarAdminImpl2 =
+                (PulsarAdminImpl) PulsarAdmin.builder()
+                        .serviceHttpUrl("http://localhost:8080";)
+                        .sharedResources(sharedResources)
+                        .build();
+
+        EventLoopGroup eventLoopGroup1 =
+                
pulsarAdminImpl1.getAsyncHttpConnector().getHttpClient().getConfig().getEventLoopGroup();
+        EventLoopGroup eventLoopGroup2 =
+                
pulsarAdminImpl2.getAsyncHttpConnector().getHttpClient().getConfig().getEventLoopGroup();
+        Timer nettyTimer1 = 
pulsarAdminImpl1.getAsyncHttpConnector().getHttpClient().getConfig().getNettyTimer();
+        Timer nettyTimer2 = 
pulsarAdminImpl2.getAsyncHttpConnector().getHttpClient().getConfig().getNettyTimer();
+        NameResolver<InetAddress> nameResolver1 = 
pulsarAdminImpl1.getAsyncHttpConnector().getNameResolver();
+        NameResolver<InetAddress> nameResolver2 = 
pulsarAdminImpl2.getAsyncHttpConnector().getNameResolver();
+
+        // test eventLoop will be created when dnsResolver is configured
+        assertThat(eventLoopGroup1).isNotNull();
+        assertThat(eventLoopGroup2).isNotNull();
+        assertThat(eventLoopGroup2).isNotSameAs(eventLoopGroup1);
+
+        // timer will not be created when timer is not configured
+        assertThat(nettyTimer1).isSameAs(nettyTimer2).isNull();
+
+        assertThat(nameResolver1).isNotNull();
+        assertThat(nameResolver2).isNotNull();
+
+        // test eventLoop will shut down when AsyncHttpConnector is closed
+        pulsarAdminImpl1.getAsyncHttpConnector().close();
+        assertThat(eventLoopGroup1.isShuttingDown()).isTrue();
+
+        sharedResources.close();
+    }
+
     @Test
     public void testClientDescriptionLengthExceed64() {
         String longDescription = "a".repeat(65);
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
index f8518b59310..e0b4b13a03d 100644
--- 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
@@ -176,7 +176,7 @@ public class AsyncHttpConnectorTest {
         };
         @Cleanup
         AsyncHttpConnector connector = new AsyncHttpConnector(5000, 
requestTimeout,
-                requestTimeout, 0, conf, false) {
+                requestTimeout, 0, conf, false, null) {
             @Override
             protected CompletableFuture<Response> oneShot(InetSocketAddress 
host, ClientRequest request) {
                 // delay the response to simulate a timeout
@@ -226,7 +226,7 @@ public class AsyncHttpConnectorTest {
 
         @Cleanup
         AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
-                5000, 0, conf, false);
+                5000, 0, conf, false, null);
 
         Request request = new RequestBuilder("GET")
                 .setUrl("http://localhost:"; + server.port() + 
"/admin/v2/clusters")
@@ -272,7 +272,7 @@ public class AsyncHttpConnectorTest {
 
         @Cleanup
         AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
-                5000, 0, conf, false);
+                5000, 0, conf, false, null);
 
         Request request = new RequestBuilder("GET")
                 .setUrl("http://localhost:"; + server.port() + "/path1")
@@ -298,7 +298,7 @@ public class AsyncHttpConnectorTest {
 
         @Cleanup
         AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
-                5000, 0, conf, false);
+                5000, 0, conf, false, null);
 
         Request request = new RequestBuilder("POST")
                 .setUrl("http://localhost:"; + server.port() + "/path1")
@@ -322,7 +322,7 @@ public class AsyncHttpConnectorTest {
 
         @Cleanup
         AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
-                5000, 0, conf, false);
+                5000, 0, conf, false, null);
 
         Request request = new RequestBuilder("POST")
                 .setUrl("http://localhost:"; + server.port() + 
"/concurrency-test")

Reply via email to