This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8559b689fc6 [feat][client] Implement PIP-234 for sharing thread pools 
and DNS resolver/cache across multiple Pulsar Client instances (#24790)
8559b689fc6 is described below

commit 8559b689fc6cdca2d6e13aeffb56996fc46a6f04
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Sep 28 02:21:51 2025 +0300

    [feat][client] Implement PIP-234 for sharing thread pools and DNS 
resolver/cache across multiple Pulsar Client instances (#24790)
---
 build/run_unit_group.sh                            |   5 +-
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  15 +-
 .../broker/service/ExclusiveProducerTest.java      |   2 +
 .../broker/service/TopicTerminationTest.java       |   2 +
 .../pulsar/client/api/ConsumerCleanupTest.java     |   2 +
 .../pulsar/client/api/ProducerCleanupTest.java     |   2 +
 ...MultiListenersWithInternalListenerNameTest.java |   8 +-
 .../client/api/SimpleProducerConsumerTest.java     |  58 +++++
 .../pulsar/client/impl/PulsarTestClient.java       |   2 +-
 .../pulsar/client/impl/TopicDoesNotExistsTest.java |   4 +-
 .../apache/pulsar/client/api/ClientBuilder.java    |  14 +
 .../pulsar/client/api/DnsResolverConfig.java       | 126 +++++++++
 .../pulsar/client/api/EventLoopGroupConfig.java    |  28 +-
 .../client/api/PulsarClientSharedResources.java    | 127 +++++++++
 .../api/PulsarClientSharedResourcesBuilder.java    | 122 +++++++++
 .../apache/pulsar/client/api/ThreadPoolConfig.java |  48 ++++
 .../org/apache/pulsar/client/api/TimerConfig.java  |  35 ++-
 .../PulsarClientImplementationBinding.java         |   3 +
 .../pulsar/client/impl/AutoClusterFailover.java    |  11 +-
 .../pulsar/client/impl/ClientBuilderImpl.java      |  20 +-
 .../apache/pulsar/client/impl/ConnectionPool.java  |   2 +-
 .../client/impl/ControlledClusterFailover.java     |  26 +-
 .../pulsar/client/impl/DnsResolverGroupImpl.java   |  54 +++-
 .../org/apache/pulsar/client/impl/HttpClient.java  |  14 +-
 .../pulsar/client/impl/HttpLookupService.java      |  14 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  67 ++---
 .../PulsarClientImplementationBindingImpl.java     |   6 +
 .../impl/PulsarClientResourcesConfigurer.java      | 149 +++++++++++
 .../PulsarClientSharedResourcesBuilderImpl.java    | 283 +++++++++++++++++++++
 .../impl/PulsarClientSharedResourcesImpl.java      | 160 ++++++++++++
 .../pulsar/client/util/ExecutorProvider.java       |   6 +-
 .../client/util/ScheduledExecutorProvider.java     |   3 +
 .../src/main/resources/findbugsExclude.xml         |   9 +
 .../client/impl/ControlledClusterFailoverTest.java |   6 +
 ...PulsarClientSharedResourcesBuilderImplTest.java | 136 ++++++++++
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 118 ++++++++-
 .../pulsar/common/util/netty/EventLoopUtil.java    |  76 +++++-
 .../pulsar/common/util/netty/DnsResolverTest.java  |   1 +
 .../common/util/netty/DnsResolverUtilTest.java     |  68 +++++
 40 files changed, 1727 insertions(+), 107 deletions(-)

diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 02347648b94..cfdf94d6436 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -177,10 +177,13 @@ function test_group_other() {
                    **/OffloadersCacheTest.java
                   **/PrimitiveSchemaTest.java,
                   **/BlobStoreManagedLedgerOffloaderTest.java,
-                  **/BlobStoreManagedLedgerOffloaderStreamingTest.java'
+                  **/BlobStoreManagedLedgerOffloaderStreamingTest.java,
+                  **/DnsResolverTest.java'
 
   mvn_test -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
                                                   **/OffloadersCacheTest.java'
+  # DnsResolverTest needs to be run separately since it relies on static field 
values
+  mvn_test -pl pulsar-common -Dinclude='**/DnsResolverTest.java'
 
   mvn_test -pl tiered-storage/jcloud 
-Dinclude='**/BlobStoreManagedLedgerOffloaderTest.java'
   mvn_test -pl tiered-storage/jcloud 
-Dinclude='**/BlobStoreManagedLedgerOffloaderStreamingTest.java'
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aa5c0b0ad2d..15fe1dcd610 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -401,7 +401,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         this.brokerClientSharedLookupExecutorProvider =
                 new ScheduledExecutorProvider(1, 
"broker-client-shared-lookup-executor");
         this.brokerClientSharedDnsResolverGroup =
-                new DnsResolverGroupImpl(this.ioEventLoopGroup,
+                new DnsResolverGroupImpl(
                         loadBrokerClientProperties(new 
ClientConfigurationData()));
 
         // here in the constructor we don't have the offloader scheduler yet
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 6fdfee86b9f..b349ba71922 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -182,7 +182,15 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
     }
 
     private URI resolveLookupUrl() {
-        if (isTcpLookup) {
+        return resolveLookupUrl(isTcpLookup);
+    }
+
+    protected String brokerServiceUrl(boolean usePulsarBinaryProtocol) {
+        return resolveLookupUrl(usePulsarBinaryProtocol).toString();
+    }
+
+    private URI resolveLookupUrl(boolean usePulsarBinaryProtocol) {
+        if (usePulsarBinaryProtocol) {
             return URI.create(pulsar.getBrokerServiceUrl());
         } else {
             return URI.create(brokerUrl != null
@@ -801,5 +809,10 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         BrokerTestUtil.logTopicStats(log, admin, topic);
     }
 
+    @DataProvider(name = "trueFalse")
+    public static Object[][] trueFalse() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 9a070efa95d..c108d5b8ca1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -49,6 +49,8 @@ public class ExclusiveProducerTest extends BrokerTestBase {
 
     @BeforeClass
     protected void setup() throws Exception {
+        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
+        isTcpLookup = true;
         baseSetup();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index a2436e4360e..e4e361eb0b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -56,6 +56,8 @@ public class TopicTerminationTest extends BrokerTestBase {
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
+        isTcpLookup = true;
         super.baseSetup();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
index cc7c2cabb87..0cbfde06381 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
@@ -34,6 +34,8 @@ public class ConsumerCleanupTest extends ProducerConsumerBase 
{
     @BeforeClass
     @Override
     protected void setup() throws Exception {
+        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
+        isTcpLookup = true;
         super.internalSetup();
         super.producerBaseSetup();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
index 5ad3c85441b..f4daba3bf05 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
@@ -33,6 +33,8 @@ public class ProducerCleanupTest extends ProducerConsumerBase 
{
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
+        isTcpLookup = true;
         super.internalSetup();
         super.producerBaseSetup();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
index 094c32c7ae6..2844d211a30 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -22,6 +22,8 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.resolver.DefaultNameResolver;
+import io.netty.util.concurrent.ImmediateEventExecutor;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.net.InetAddress;
@@ -137,7 +139,8 @@ public class 
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
         conf.setMaxLookupRedirects(10);
 
         @Cleanup
-        LookupService lookupService = useHttp ? new 
HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors) :
+        LookupService lookupService = useHttp ? new 
HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors,
+                null, new 
DefaultNameResolver(ImmediateEventExecutor.INSTANCE)) :
                 new BinaryProtoLookupService((PulsarClientImpl) 
this.pulsarClient,
                 lookupUrl.toString(), "internal", false, this.executorService);
         TopicName topicName = 
TopicName.get("persistent://public/default/test");
@@ -173,7 +176,8 @@ public class 
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
         conf.setMaxLookupRedirects(10);
 
         @Cleanup
-        HttpLookupService lookupService = new 
HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors);
+        HttpLookupService lookupService = new 
HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors, null,
+                new DefaultNameResolver(ImmediateEventExecutor.INSTANCE));
         NamespaceService namespaceService = pulsar.getNamespaceService();
 
         LookupResult lookupResult = new 
LookupResult(pulsar.getWebServiceAddress(), null,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2b1d83be591..4e15062b1e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -42,6 +42,7 @@ import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.Timeout;
 import java.io.ByteArrayInputStream;
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
@@ -5304,4 +5305,61 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         producer.close();
         consumer.close();
     }
+
+    @Test(dataProvider = "trueFalse")
+    public void testResourceSharingEndToEnd(boolean usePulsarBinaryProtocol) 
throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic");
+        int numberOfClients = 50;
+        List<PulsarClient> clients = new ArrayList<>();
+        @Cleanup
+        Closeable closeClients = () -> {
+            for (PulsarClient client : clients) {
+                try {
+                    client.close();
+                } catch (PulsarClientException e) {
+                    log.error("Failed to close client {}", client, e);
+                }
+            }
+        };
+        @Cleanup
+        PulsarClientSharedResources sharedResources = 
PulsarClientSharedResources.builder().build();
+        List<Consumer<byte[]>> consumers = new ArrayList<>();
+
+        for (int i = 0; i < numberOfClients; i++) {
+            PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(brokerServiceUrl(usePulsarBinaryProtocol))
+                    .sharedResources(sharedResources)
+                    .build();
+            clients.add(client);
+            consumers.add(client.newConsumer().topic(topic)
+                    .subscriptionName("my-subscriber-name" + i).subscribe());
+        }
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic(topic);
+
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        for (Consumer<byte[]> consumer : consumers) {
+            Message<byte[]> msg = null;
+            Set<String> messageSet = new HashSet<>();
+            for (int i = 0; i < 10; i++) {
+                msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+                String receivedMessage = new String(msg.getData());
+                log.debug("Received message: [{}]", receivedMessage);
+                String expectedMessage = "my-message-" + i;
+                testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+            }
+            // Acknowledge the consumption of all messages at once
+            consumer.acknowledgeCumulative(msg);
+            consumer.close();
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index 27301db9179..db330951f74 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -91,7 +91,7 @@ public class PulsarTestClient extends PulsarClientImpl {
     private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool,
                              AtomicReference<Supplier<ClientCnx>> 
clientCnxSupplierReference)
             throws PulsarClientException {
-        super(conf, eventLoopGroup, cnxPool);
+        super(conf, eventLoopGroup, cnxPool, null, null, null, null, null, new 
DnsResolverGroupImpl(conf));
         // workaround initialization order issue so that ClientCnx can be 
created in this class
         clientCnxSupplierReference.set(this::createClientCnx);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
index b48b97978b7..f528c827549 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -39,6 +39,8 @@ public class TopicDoesNotExistsTest extends 
ProducerConsumerBase {
     @Override
     @BeforeClass
     public void setup() throws Exception {
+        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
+        isTcpLookup = true;
         conf.setAllowAutoTopicCreation(false);
         super.internalSetup();
         super.producerBaseSetup();
@@ -61,7 +63,7 @@ public class TopicDoesNotExistsTest extends 
ProducerConsumerBase {
                     .create();
             Assert.fail("Create producer should failed while topic does not 
exists.");
         } catch (PulsarClientException e) {
-            Assert.assertTrue(e instanceof 
PulsarClientException.NotFoundException);
+            Assert.assertTrue(e instanceof 
PulsarClientException.TopicDoesNotExistException);
         }
         Thread.sleep(2000);
         HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 034c167a9b3..d31d42bbe63 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -734,4 +734,18 @@ public interface ClientBuilder extends Serializable, 
Cloneable {
      * @throws IllegalArgumentException if the length of description exceeds 64
      */
     ClientBuilder description(String description);
+
+    /**
+     * Provide a set of shared client resources to be reused by this client.
+     * <p>
+     * Providing a shared resource instance allows PulsarClient instances to 
share resources
+     * (such as IO/event loops, timers, executors, DNS resolver/cache) with 
other PulsarClient
+     * instances, reducing memory footprint and thread usage when creating 
many clients in the same JVM.
+     *
+     * @param sharedResources the shared resources instance created with 
{@link PulsarClientSharedResources#builder()}
+     * @return the client builder instance
+     * @see PulsarClientSharedResources
+     * @see PulsarClientSharedResourcesBuilder
+     */
+    ClientBuilder sharedResources(PulsarClientSharedResources sharedResources);
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DnsResolverConfig.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DnsResolverConfig.java
new file mode 100644
index 00000000000..09c8f5fccfc
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DnsResolverConfig.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Configuration interface for DNS resolver settings.
+ */
+public interface DnsResolverConfig {
+    /**
+     * Sets the minimum TTL for DNS records.
+     * Defaults to 0.
+     *
+     * @param minTtl the minimum TTL value
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig minTtl(int minTtl);
+
+    /**
+     * Sets the maximum TTL for DNS records.
+     * Defaults to JDK level setting.
+     *
+     * @param maxTtl the maximum TTL value
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig maxTtl(int maxTtl);
+
+    /**
+     * Sets the TTL for negative DNS responses.
+     * Defaults to JDK level setting.
+     *
+     * @param negativeTtl the negative TTL value
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig negativeTtl(int negativeTtl);
+
+    /**
+     * Sets the query timeout in milliseconds.
+     * Defaults to the OS level setting.
+     *
+     * @param queryTimeoutMillis the timeout value in milliseconds
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig queryTimeoutMillis(long queryTimeoutMillis);
+
+    /**
+     * Enables or disables TCP fallback for DNS queries.
+     * Defaults to true.
+     *
+     * @param tcpFallbackEnabled true to enable TCP fallback, false to disable
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig tcpFallbackEnabled(boolean tcpFallbackEnabled);
+
+    /**
+     * Enables or disables TCP fallback on timeout for DNS queries.
+     * Defaults to true.
+     *
+     * @param tcpFallbackOnTimeoutEnabled true to enable TCP fallback on 
timeout, false to disable
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig tcpFallbackOnTimeoutEnabled(boolean 
tcpFallbackOnTimeoutEnabled);
+
+    /**
+     * Sets the ndots value for DNS resolution. Set this to 1 to disable the 
use of DNS search domains.
+     * Defaults to the OS level configuration.
+     *
+     * @param ndots the ndots value
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig ndots(int ndots);
+
+    /**
+     * Sets the search domains for DNS resolution.
+     * Defaults to the OS level configuration.
+     *
+     * @param searchDomains collection of search domains
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig searchDomains(Iterable<String> searchDomains);
+
+    /**
+     * Sets the local bind address and port for DNS lookup client.
+     * By default any available port is used. This setting is mainly in cases 
where it's necessary to bind
+     * the DNS client to a specific address and optionally also the port.
+     *
+     * @param localAddress the socket address (address and port) to bind the 
DNS client to
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig localAddress(InetSocketAddress localAddress);
+
+    /**
+     * Sets the DNS server addresses to be used for DNS lookups.
+     * Defaults to the OS level configuration.
+     *
+     * @param addresses collection of DNS server socket addresses (address and 
port)
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig serverAddresses(Iterable<InetSocketAddress> addresses);
+
+    /**
+     * Enable detailed trace information for resolution failure exception 
messages.
+     * Defaults to true.
+     *
+     * @param traceEnabled true to enable tracing, false to disable
+     * @return the DNS resolver configuration instance for chained calls
+     */
+    DnsResolverConfig traceEnabled(boolean traceEnabled);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EventLoopGroupConfig.java
similarity index 60%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
copy to 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EventLoopGroupConfig.java
index 3ebd41ebe56..84b8b78ee41 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EventLoopGroupConfig.java
@@ -16,21 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.util;
+package org.apache.pulsar.client.api;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ScheduledExecutorProvider extends ExecutorProvider {
-
-    public ScheduledExecutorProvider(int numThreads, String poolName) {
-        super(numThreads, poolName);
-    }
-
-    @Override
-    protected ExecutorService createExecutor(ExtendedThreadFactory 
threadFactory) {
-        return Executors.newSingleThreadScheduledExecutor(threadFactory);
-    }
+/**
+ * Configuration interface for event loop group settings.
+ */
+public interface EventLoopGroupConfig extends 
ThreadPoolConfig<EventLoopGroupConfig> {
+    /**
+     * Enables or disables busy-wait polling mode.
+     *
+     * @param enableBusyWait true to enable busy-wait polling, false to disable
+     * @return this config instance for method chaining
+     */
+    EventLoopGroupConfig enableBusyWait(boolean enableBusyWait);
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
new file mode 100644
index 00000000000..34a56ebd608
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.Set;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+/**
+ * Manages shared resources across multiple PulsarClient instances to optimize 
resource utilization.
+ * This interface provides access to common resources such as thread pools and 
DNS resolver/cache
+ * that can be shared between multiple PulsarClient instances.
+ *
+ * This allows creating a large number of PulsarClient instances in a single 
JVM without wasting resources.
+ * Sharing the DNS resolver and cache will help reduce the number of DNS 
lookups performed by the clients
+ * and improve latency and reduce load on the DNS servers.
+ *
+ * <p>Key features:
+ * <ul>
+ *   <li>Thread pool sharing for various Pulsar operations (IO, timer, lookup, 
etc.)</li>
+ *   <li>Shared DNS resolver and cache</li>
+ *   <li>Resource lifecycle management</li>
+ * </ul>
+ *
+ * <p>Usage example:
+ * <pre>{@code
+ * // To share all possible resources across multiple PulsarClient instances
+ * PulsarClientSharedResources sharedResources = 
PulsarClientSharedResources.builder()
+ *     .build();
+ * // Use these resources with multiple PulsarClient instances
+ * PulsarClient client1 = 
PulsarClient.builder().sharedResources(sharedResources).build();
+ * PulsarClient client2 = 
PulsarClient.builder().sharedResources(sharedResources).build();
+ * client1.close();
+ * client2.close();
+ * sharedResources.close();
+ * }</pre>
+ *
+ * <p>The instance should be created using the {@link 
PulsarClientSharedResourcesBuilder},
+ * which can be obtained via the {@link #builder()} method.
+ *
+ * @see PulsarClientSharedResourcesBuilder
+ * @see SharedResource
+ */
+public interface PulsarClientSharedResources extends AutoCloseable {
+    enum SharedResource {
+        // pulsar-io threadpool
+        EventLoopGroup(SharedResourceType.EventLoopGroup),
+        // pulsar-external-listener threadpool
+        ListenerExecutor(SharedResourceType.ThreadPool),
+        // pulsar-timer threadpool
+        Timer(SharedResourceType.Timer),
+        // pulsar-client-internal threadpool
+        InternalExecutor(SharedResourceType.ThreadPool),
+        // pulsar-client-scheduled threadpool
+        ScheduledExecutor(SharedResourceType.ThreadPool),
+        // pulsar-lookup threadpool
+        LookupExecutor(SharedResourceType.ThreadPool),
+        // DNS resolver and cache that must be shared together with 
eventLoopGroup
+        DnsResolver(SharedResourceType.DnsResolver);
+
+        private final SharedResourceType type;
+
+        SharedResource(SharedResourceType type) {
+            this.type = type;
+        }
+
+        public SharedResourceType getType() {
+            return type;
+        }
+    }
+
+    enum SharedResourceType {
+        EventLoopGroup,
+        ThreadPool,
+        Timer,
+        DnsResolver;
+    }
+
+    /**
+     * Checks if a resource type is contained in the shared resources instance.
+     *
+     * @param sharedResource the type of resource to check
+     * @return true if the resource type is contained in this instance, false 
otherwise
+     */
+    boolean contains(SharedResource sharedResource);
+
+    /**
+     * Gets all resource types contained in this shared resources instance.
+     *
+     * @return set of resource types available in this instance
+     */
+    Set<SharedResource> getSharedResources();
+
+    /**
+     * Creates a new builder for constructing instances of {@link 
PulsarClientSharedResources}.
+     *
+     * @return a new {@link PulsarClientSharedResourcesBuilder} which can be 
used to configure
+     *         and build shared resources for use across multiple PulsarClient 
instances.
+     */
+    static PulsarClientSharedResourcesBuilder builder() {
+        return 
DefaultImplementation.getDefaultImplementation().newSharedResourcesBuilder();
+    }
+
+    /**
+     * Closes this instance of PulsarClientSharedResources and releases all 
associated resources.
+     * All PulsarClient instances using these shared resources must be closed 
before calling this method.
+     *
+     * @throws PulsarClientException if there is an error while closing the 
shared resources
+     */
+    @Override
+    void close() throws PulsarClientException;
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
new file mode 100644
index 00000000000..3a94d5352b5
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static 
org.apache.pulsar.client.api.PulsarClientSharedResources.SharedResource;
+import java.util.Collection;
+import java.util.function.Consumer;
+
+/**
+ * Builder for configuring and creating {@link PulsarClientSharedResources}.
+ * <p>
+ * Shared resources allow multiple PulsarClient instances to reuse common 
components
+ * (for example executors, timers, DNS resolver, event loop, connection pool),
+ * reducing memory footprint and thread count when many clients are created in 
the same JVM.
+ * This allows creating a large number of PulsarClient instances in a single 
JVM without wasting resources.
+ * Sharing the DNS resolver and cache will help reduce the number of DNS 
lookups performed by the clients
+ * and improve latency and reduce load on the DNS servers.
+ * <p>
+ * Typical usage:
+ * <pre>{@code
+ * PulsarClientSharedResources shared = PulsarClientSharedResources
+ *     .builder()
+ *     // shared resources can be configured using a ClientBuilder
+ *     .configureResources(PulsarClient.builder().ioThreads(4))
+ *     .build();
+ *
+ * PulsarClient client = PulsarClient.builder()
+ *     .sharedResources(shared)
+ *     .serviceUrl("pulsar://localhost:6650")
+ *     .build();
+ *
+ * client.close();
+ * // it's necessary to close the shared resources after usage
+ * shared.close();
+ * }</pre>
+ */
+public interface PulsarClientSharedResourcesBuilder {
+    /**
+     * Optionally limits the shared resource types to be created and exposed 
by the resulting
+     * {@link PulsarClientSharedResources}. If this method isn't called, 
resources for all resource types
+     * will be created and exposed.
+     * Calling this method is additive and the builder will accumulate the 
shared resource types from all method calls.
+     *
+     * @param sharedResource one or more resource types to include
+     * @return this builder
+     */
+    PulsarClientSharedResourcesBuilder resourceTypes(SharedResource... 
sharedResource);
+
+    /**
+     * @param sharedResource one or more resource types to include
+     * @return this builder
+     * @see {@link #resourceTypes(SharedResource...)}
+     */
+    PulsarClientSharedResourcesBuilder 
resourceTypes(Collection<SharedResource> sharedResource);
+
+    /**
+     * Share only the configured resources. It's not allowed to use {@link 
#resourceTypes(SharedResource...)} when
+     * this method is called.
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder shareConfigured();
+
+    /**
+     * Builds the {@link PulsarClientSharedResources} instance. It is 
necessary to call
+     * {@link PulsarClientSharedResources#close()} on the instance to release 
resources. All clients must be closed
+     * before closing the shared resources.
+     *
+     * @return the shared resources to be used in {@link 
ClientBuilder#sharedResources(PulsarClientSharedResources)}
+     */
+    PulsarClientSharedResources build();
+
+    /**
+     * Configures a thread pool for the specified shared resource type.
+     *
+     * @param sharedResource the type of shared resource to configure
+     * @param configurer     a consumer that configures the thread pool 
settings
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder configureThreadPool(SharedResource 
sharedResource,
+                                                           
Consumer<ThreadPoolConfig<?>> configurer);
+
+    /**
+     * Configures the event loop group settings.
+     *
+     * @param configurer a consumer that configures the event loop group 
settings
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder 
configureEventLoop(Consumer<EventLoopGroupConfig> configurer);
+
+    /**
+     * Configures the DNS resolver settings.
+     *
+     * @param configurer a consumer that configures the DNS resolver settings
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder 
configureDnsResolver(Consumer<DnsResolverConfig> configurer);
+
+    /**
+     * Configures the timer settings.
+     *
+     * @param configurer a consumer that configures the timer settings
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder configureTimer(Consumer<TimerConfig> 
configurer);
+
+}
\ No newline at end of file
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ThreadPoolConfig.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ThreadPoolConfig.java
new file mode 100644
index 00000000000..dd57a6cebfe
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ThreadPoolConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Configuration interface for thread pool settings.
+ */
+public interface ThreadPoolConfig<T extends ThreadPoolConfig<T>> {
+    /**
+     * Sets the name of the thread pool.
+     *
+     * @param name the name to set for the thread pool
+     * @return this config instance for method chaining
+     */
+    T name(String name);
+
+    /**
+     * Sets the number of threads in the thread pool.
+     *
+     * @param numberOfThreads the number of threads to use
+     * @return this config instance for method chaining
+     */
+    T numberOfThreads(int numberOfThreads);
+
+    /**
+     * Sets whether the threads should be daemon threads.
+     *
+     * @param daemon true if the threads should be daemon threads, false 
otherwise
+     * @return this config instance for method chaining
+     */
+    T daemon(boolean daemon);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
 b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TimerConfig.java
similarity index 54%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
copy to 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TimerConfig.java
index 3ebd41ebe56..1c1235704df 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TimerConfig.java
@@ -16,21 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.util;
+package org.apache.pulsar.client.api;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import lombok.extern.slf4j.Slf4j;
+import java.util.concurrent.TimeUnit;
 
-@Slf4j
-public class ScheduledExecutorProvider extends ExecutorProvider {
-
-    public ScheduledExecutorProvider(int numThreads, String poolName) {
-        super(numThreads, poolName);
-    }
+/**
+ * Configuration interface for timer settings.
+ */
+public interface TimerConfig {
+    /**
+     * Sets the name of the timer.
+     *
+     * @param name the name to set for the timer
+     * @return this config instance for method chaining
+     */
+    TimerConfig name(String name);
 
-    @Override
-    protected ExecutorService createExecutor(ExtendedThreadFactory 
threadFactory) {
-        return Executors.newSingleThreadScheduledExecutor(threadFactory);
-    }
+    /**
+     * Sets the tick duration for the timer.
+     *
+     * @param tickDuration the duration of each tick
+     * @param timeUnit     the time unit for the tick duration
+     * @return this config instance for method chaining
+     */
+    TimerConfig tickDuration(long tickDuration, TimeUnit timeUnit);
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index 8fd05bff265..b5f2a3a468e 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessagePayloadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResourcesBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -255,4 +256,6 @@ public interface PulsarClientImplementationBinding {
                                  Map<String, String> propertiesValue);
 
     TopicMessageId newTopicMessageId(String topic, MessageId messageId);
+
+    PulsarClientSharedResourcesBuilder newSharedResourcesBuilder();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 844d1e2d253..7615ec9f810 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.base.Strings;
+import io.netty.resolver.AddressResolver;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -63,6 +64,7 @@ public class AutoClusterFailover implements 
ServiceUrlProvider {
     private final long intervalMs;
     private static final int TIMEOUT = 30_000;
     private final PulsarServiceNameResolver resolver;
+    private AddressResolver<InetSocketAddress> addressResolver;
 
     private AutoClusterFailover(AutoClusterFailoverBuilderImpl builder) {
         this.primary = builder.primary;
@@ -86,6 +88,7 @@ public class AutoClusterFailover implements 
ServiceUrlProvider {
     @Override
     public void initialize(PulsarClient client) {
         this.pulsarClient = (PulsarClientImpl) client;
+        this.addressResolver = pulsarClient.getAddressResolver();
         ClientConfigurationData config = pulsarClient.getConfiguration();
         if (config != null) {
             this.primaryAuthentication = config.getAuthentication();
@@ -128,8 +131,14 @@ public class AutoClusterFailover implements 
ServiceUrlProvider {
         try {
             resolver.updateServiceUrl(url);
             InetSocketAddress endpoint = resolver.resolveHost();
+            if (!endpoint.isUnresolved()) {
+                // create an unresolved endpoint to ensure that DNS lookup is 
performed again
+                endpoint = 
InetSocketAddress.createUnresolved(endpoint.getHostString(), 
endpoint.getPort());
+            }
+            // Use Netty's DNS resolver and settings to resolve the host name
+            InetSocketAddress resolvedEndpoint = 
addressResolver.resolve(endpoint).get(TIMEOUT, TimeUnit.MILLISECONDS);
             try (Socket socket = new Socket()) {
-                socket.connect(new InetSocketAddress(endpoint.getHostName(), 
endpoint.getPort()), TIMEOUT);
+                socket.connect(resolvedEndpoint, TIMEOUT);
             }
             return true;
         } catch (Exception e) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index d031c8e1710..5b3a52d5e42 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
@@ -43,7 +44,9 @@ import org.apache.pulsar.common.tls.InetAddressUtils;
 import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
 
 public class ClientBuilderImpl implements ClientBuilder {
+    private static final long serialVersionUID = 1L;
     ClientConfigurationData conf;
+    private transient PulsarClientSharedResourcesImpl sharedResources;
 
     public ClientBuilderImpl() {
         this(new ClientConfigurationData());
@@ -68,12 +71,19 @@ public class ClientBuilderImpl implements ClientBuilder {
         if (conf.getAuthentication() == null || conf.getAuthentication() == 
AuthenticationDisabled.INSTANCE) {
             setAuthenticationFromPropsIfAvailable(conf);
         }
-        return new PulsarClientImpl(conf);
+        PulsarClientImpl.PulsarClientImplBuilder instanceBuilder = 
PulsarClientImpl.builder();
+        instanceBuilder.conf(conf);
+        if (sharedResources != null) {
+            sharedResources.applyTo(instanceBuilder);
+        }
+        return instanceBuilder.build();
     }
 
     @Override
     public ClientBuilder clone() {
-        return new ClientBuilderImpl(conf.clone());
+        ClientBuilderImpl clientBuilder = new ClientBuilderImpl(conf.clone());
+        clientBuilder.sharedResources = sharedResources;
+        return clientBuilder;
     }
 
     @Override
@@ -486,6 +496,12 @@ public class ClientBuilderImpl implements ClientBuilder {
         return this;
     }
 
+    @Override
+    public ClientBuilder sharedResources(PulsarClientSharedResources 
sharedResources) {
+        this.sharedResources = (PulsarClientSharedResourcesImpl) 
sharedResources;
+        return this;
+    }
+
     @Override
     public ClientBuilder lookupProperties(Map<String, String> properties) {
         conf.setLookupProperties(properties);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index f8412b5bf29..fea599ea408 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -191,7 +191,7 @@ public class ConnectionPool implements AutoCloseable {
     private Supplier<AddressResolver<InetSocketAddress>> 
createAddressResolver(ClientConfigurationData conf,
                                                                                
EventLoopGroup eventLoopGroup) {
         if (dnsResolverGroup == null) {
-            dnsResolverGroup = new DnsResolverGroupImpl(eventLoopGroup, conf);
+            dnsResolverGroup = new DnsResolverGroupImpl(conf);
         }
         return () -> dnsResolverGroup.createAddressResolver(eventLoopGroup);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index c52a70c6cd7..032069786a4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -27,6 +27,8 @@ import io.netty.handler.codec.http.HttpResponse;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
@@ -59,27 +61,27 @@ public class ControlledClusterFailover implements 
ServiceUrlProvider {
     private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
     private static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
     private static final int DEFAULT_MAX_REDIRECTS = 20;
+    private final Map<String, String> headers;
+    private final String urlProvider;
 
     private PulsarClientImpl pulsarClient;
     private volatile String currentPulsarServiceUrl;
     private volatile ControlledConfiguration currentControlledConfiguration;
     private final ScheduledExecutorService executor;
     private final long interval;
-    private final AsyncHttpClient httpClient;
-    private final BoundRequestBuilder requestBuilder;
+    private AsyncHttpClient httpClient;
+    private BoundRequestBuilder requestBuilder;
 
     private ControlledClusterFailover(ControlledClusterFailoverBuilderImpl 
builder) throws IOException {
         this.currentPulsarServiceUrl = builder.defaultServiceUrl;
         this.interval = builder.interval;
         this.executor = Executors.newSingleThreadScheduledExecutor(
                 new 
ExecutorProvider.ExtendedThreadFactory("pulsar-service-provider"));
-
-        this.httpClient = buildHttpClient();
-        this.requestBuilder = httpClient.prepareGet(builder.urlProvider)
-            .addHeader("Accept", "application/json");
-
+        urlProvider = builder.urlProvider;
         if (builder.header != null && !builder.header.isEmpty()) {
-            builder.header.forEach(requestBuilder::addHeader);
+            headers = new LinkedHashMap<>(builder.header);
+        } else {
+            headers = Collections.emptyMap();
         }
     }
 
@@ -101,6 +103,8 @@ public class ControlledClusterFailover implements 
ServiceUrlProvider {
                     && super.keepAlive(remoteAddress, ahcRequest, request, 
response);
             }
         });
+        confBuilder.setNettyTimer(pulsarClient.timer());
+        confBuilder.setEventLoopGroup(pulsarClient.eventLoopGroup());
         AsyncHttpClientConfig config = confBuilder.build();
         return new DefaultAsyncHttpClient(config);
     }
@@ -108,6 +112,12 @@ public class ControlledClusterFailover implements 
ServiceUrlProvider {
     @Override
     public void initialize(PulsarClient client) {
         this.pulsarClient = (PulsarClientImpl) client;
+        this.httpClient = buildHttpClient();
+        this.requestBuilder = httpClient.prepareGet(urlProvider)
+                // share Pulsar client DNS resolver and cache
+                .setNameResolver(pulsarClient.getNameResolver())
+                .addHeader("Accept", "application/json");
+        headers.forEach(requestBuilder::addHeader);
 
         // start to check service url every 30 seconds
         this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
index 61af7968f81..0702e6324ba 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
@@ -41,36 +41,68 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
 public class DnsResolverGroupImpl implements AutoCloseable {
     private final DnsAddressResolverGroup dnsAddressResolverGroup;
 
-    public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup, 
ClientConfigurationData conf) {
+    public DnsResolverGroupImpl(ClientConfigurationData conf) {
         Optional<InetSocketAddress> bindAddress = 
Optional.ofNullable(conf.getDnsLookupBindAddress())
                 .map(addr -> new InetSocketAddress(addr, 
conf.getDnsLookupBindPort()));
         Optional<DnsServerAddressStreamProvider> dnsServerAddresses = 
Optional.ofNullable(conf.getDnsServerAddresses())
                 .filter(Predicate.not(List::isEmpty))
                 .map(SequentialDnsServerAddressStreamProvider::new);
-        this.dnsAddressResolverGroup = 
createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses);
+        this.dnsAddressResolverGroup = createAddressResolverGroup(bindAddress, 
dnsServerAddresses);
     }
 
-    public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup, 
Optional<InetSocketAddress> bindAddress,
-                                Optional<DnsServerAddressStreamProvider> 
dnsServerAddresses) {
-        this.dnsAddressResolverGroup = 
createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses);
+    public 
DnsResolverGroupImpl(PulsarClientSharedResourcesBuilderImpl.DnsResolverResourceConfig
 dnsConfig) {
+        this.dnsAddressResolverGroup = createAddressResolverGroup(dnsConfig);
     }
 
-    private static DnsAddressResolverGroup 
createAddressResolverGroup(EventLoopGroup eventLoopGroup,
-                                                                      
Optional<InetSocketAddress> bindAddress,
+    private DnsAddressResolverGroup createAddressResolverGroup(
+            PulsarClientSharedResourcesBuilderImpl.DnsResolverResourceConfig 
dnsConfig) {
+        DnsNameResolverBuilder dnsNameResolverBuilder = new 
DnsNameResolverBuilder()
+                .traceEnabled(dnsConfig.traceEnabled)
+                .channelType(EventLoopUtil.getDatagramChannelClass());
+        if (dnsConfig.tcpFallbackEnabled || 
dnsConfig.tcpFallbackOnTimeoutEnabled) {
+            
dnsNameResolverBuilder.socketChannelType(EventLoopUtil.getClientSocketChannelClass(),
+                    dnsConfig.tcpFallbackOnTimeoutEnabled);
+        }
+        dnsNameResolverBuilder
+                .ttl(dnsConfig.minTtl, dnsConfig.maxTtl)
+                .negativeTtl(dnsConfig.negativeTtl);
+        if (dnsConfig.queryTimeoutMillis > -1L) {
+            
dnsNameResolverBuilder.queryTimeoutMillis(dnsConfig.queryTimeoutMillis);
+        }
+        if (dnsConfig.ndots > -1) {
+            dnsNameResolverBuilder.ndots(dnsConfig.ndots);
+        }
+        if (dnsConfig.localAddress != null) {
+            dnsNameResolverBuilder.localAddress(dnsConfig.localAddress);
+        }
+        if (dnsConfig.serverAddresses != null) {
+            Optional.ofNullable(dnsConfig.serverAddresses)
+                    .map(SequentialDnsServerAddressStreamProvider::new)
+                    .ifPresent(dnsServerAddressStreamProvider -> {
+                        
dnsNameResolverBuilder.nameServerProvider(dnsServerAddressStreamProvider);
+                    });
+        }
+        if (dnsConfig.searchDomains != null) {
+            dnsNameResolverBuilder.searchDomains(dnsConfig.searchDomains);
+        }
+        return new DnsAddressResolverGroup(dnsNameResolverBuilder);
+    }
+
+    private static DnsAddressResolverGroup 
createAddressResolverGroup(Optional<InetSocketAddress> bindAddress,
                                                                       
Optional<DnsServerAddressStreamProvider>
                                                                               
dnsServerAddresses) {
-        DnsNameResolverBuilder dnsNameResolverBuilder = 
createDnsNameResolverBuilder(eventLoopGroup);
+        DnsNameResolverBuilder dnsNameResolverBuilder = 
createDnsNameResolverBuilder();
         bindAddress.ifPresent(dnsNameResolverBuilder::localAddress);
         
dnsServerAddresses.ifPresent(dnsNameResolverBuilder::nameServerProvider);
 
         return new DnsAddressResolverGroup(dnsNameResolverBuilder);
     }
 
-    private static DnsNameResolverBuilder 
createDnsNameResolverBuilder(EventLoopGroup eventLoopGroup) {
+    private static DnsNameResolverBuilder createDnsNameResolverBuilder() {
         DnsNameResolverBuilder dnsNameResolverBuilder = new 
DnsNameResolverBuilder()
                 .traceEnabled(true)
-                
.channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup))
-                
.socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup), 
true);
+                .channelType(EventLoopUtil.getDatagramChannelClass())
+                
.socketChannelType(EventLoopUtil.getClientSocketChannelClass(), true);
         DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
         return dnsNameResolverBuilder;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 633ca606388..72fcf82b859 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -21,9 +21,12 @@ package org.apache.pulsar.client.impl;
 import io.netty.channel.EventLoopGroup;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
+import io.netty.resolver.NameResolver;
+import io.netty.util.Timer;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.HttpURLConnection;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
@@ -65,16 +68,20 @@ public class HttpClient implements Closeable {
 
     protected final AsyncHttpClient httpClient;
     protected final ServiceNameResolver serviceNameResolver;
+    private final NameResolver<InetAddress> nameResolver;
     protected final Authentication authentication;
     protected final ClientConfigurationData clientConf;
     protected ScheduledExecutorService executorService;
     protected PulsarSslFactory sslFactory;
 
-    protected HttpClient(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
+    protected HttpClient(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, Timer timer,
+                         NameResolver<InetAddress> nameResolver)
+            throws PulsarClientException {
         this.authentication = conf.getAuthentication();
         this.clientConf = conf;
         this.serviceNameResolver = new 
PulsarServiceNameResolver(conf.getServiceUrlQuarantineInitDurationMs(),
                 conf.getServiceUrlQuarantineMaxDurationMs());
+        this.nameResolver = nameResolver;
         this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
@@ -128,6 +135,7 @@ public class HttpClient implements Closeable {
             }
         }
         confBuilder.setEventLoopGroup(eventLoopGroup);
+        confBuilder.setNettyTimer(timer);
         AsyncHttpClientConfig config = confBuilder.build();
         httpClient = new DefaultAsyncHttpClient(config);
 
@@ -184,7 +192,9 @@ public class HttpClient implements Closeable {
 
                 // auth complete, use a new builder
                 BoundRequestBuilder builder = httpClient.prepareGet(requestUrl)
-                    .setHeader("Accept", "application/json");
+                        // share the DNS resolver and cache with Pulsar client
+                        .setNameResolver(nameResolver)
+                        .setHeader("Accept", "application/json");
 
                 if (authData.hasDataForHttp()) {
                     Set<Entry<String, String>> headers;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 4a5557fa869..08c9956b5be 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.client.impl;
 
 import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.NameResolver;
+import io.netty.util.Timer;
 import io.opentelemetry.api.common.Attributes;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -65,10 +68,17 @@ public class HttpLookupService implements LookupService {
     private final LatencyHistogram histoGetSchema;
     private final LatencyHistogram histoListTopics;
 
+    @Deprecated
     public HttpLookupService(InstrumentProvider instrumentProvider, 
ClientConfigurationData conf,
-                             EventLoopGroup eventLoopGroup)
+                             EventLoopGroup eventLoopGroup) throws 
PulsarClientException {
+        this(instrumentProvider, conf, eventLoopGroup, null, null);
+    }
+
+    public HttpLookupService(InstrumentProvider instrumentProvider, 
ClientConfigurationData conf,
+                             EventLoopGroup eventLoopGroup, Timer timer,
+                             NameResolver<InetAddress> nameResolver)
             throws PulsarClientException {
-        this.httpClient = new HttpClient(conf, eventLoopGroup);
+        this.httpClient = new HttpClient(conf, eventLoopGroup, timer, 
nameResolver);
         this.useTls = conf.isUseTls();
         this.listenerName = conf.getListenerName();
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index d38d35926a4..6ffdfa55a9b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -25,9 +25,10 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import io.netty.channel.EventLoopGroup;
 import io.netty.resolver.AddressResolver;
-import io.netty.util.HashedWheelTimer;
+import io.netty.resolver.NameResolver;
 import io.netty.util.Timer;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.time.Duration;
@@ -44,7 +45,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -97,7 +97,7 @@ import org.apache.pulsar.common.topics.TopicsPatternFactory;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,6 +131,7 @@ public class PulsarClientImpl implements PulsarClient {
     private final boolean createdEventLoopGroup;
     private final boolean createdCnxPool;
     private final DnsResolverGroupImpl dnsResolverGroupLocalInstance;
+    @Getter
     private final AddressResolver<InetSocketAddress> addressResolver;
 
     public enum State {
@@ -199,12 +200,12 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     @Builder(builderClassName = "PulsarClientImplBuilder")
-    private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool connectionPool,
-                             Timer timer, ExecutorProvider 
externalExecutorProvider,
-                             ExecutorProvider internalExecutorProvider,
-                             ScheduledExecutorProvider 
scheduledExecutorProvider,
-                             ExecutorProvider lookupExecutorProvider,
-                             DnsResolverGroupImpl dnsResolverGroup) throws 
PulsarClientException {
+    PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool connectionPool,
+                     Timer timer, ExecutorProvider externalExecutorProvider,
+                     ExecutorProvider internalExecutorProvider,
+                     ScheduledExecutorProvider scheduledExecutorProvider,
+                     ExecutorProvider lookupExecutorProvider,
+                     DnsResolverGroupImpl dnsResolverGroup) throws 
PulsarClientException {
 
         EventLoopGroup eventLoopGroupReference = null;
         ConnectionPool connectionPoolReference = null;
@@ -222,22 +223,26 @@ public class PulsarClientImpl implements PulsarClient {
             this.createdExecutorProviders = externalExecutorProvider == null;
             this.createdScheduledProviders = scheduledExecutorProvider == null;
             this.createdLookupProviders = lookupExecutorProvider == null;
-            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup 
: getEventLoopGroup(conf);
+            eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup :
+                    PulsarClientResourcesConfigurer.createEventLoopGroup(conf);
             this.eventLoopGroup = eventLoopGroupReference;
             this.instrumentProvider = new 
InstrumentProvider(conf.getOpenTelemetry());
             clientClock = conf.getClock();
             conf.getAuthentication().start();
             this.scheduledExecutorProvider = scheduledExecutorProvider != null 
? scheduledExecutorProvider :
-                    new ScheduledExecutorProvider(conf.getNumIoThreads(), 
"pulsar-client-scheduled");
+                    
PulsarClientResourcesConfigurer.createScheduledExecutorProvider(conf);
             if (connectionPool != null) {
                 connectionPoolReference = connectionPool;
                 dnsResolverGroupLocalInstance = null;
-                addressResolver = null;
+                addressResolver = dnsResolverGroup != null
+                        ? 
dnsResolverGroup.createAddressResolver(eventLoopGroupReference) : null;
             } else {
                 DnsResolverGroupImpl dnsResolverGroupReference;
                 if (dnsResolverGroup == null) {
                     dnsResolverGroupReference =
-                            dnsResolverGroupLocalInstance = new 
DnsResolverGroupImpl(eventLoopGroupReference, conf);
+                            dnsResolverGroupLocalInstance =
+                                    
PulsarClientResourcesConfigurer.createDnsResolverGroup(conf
+                                    );
                 } else {
                     dnsResolverGroupReference = dnsResolverGroup;
                     dnsResolverGroupLocalInstance = null;
@@ -254,24 +259,25 @@ public class PulsarClientImpl implements PulsarClient {
             }
             this.cnxPool = connectionPoolReference;
             this.externalExecutorProvider = externalExecutorProvider != null ? 
externalExecutorProvider :
-                    new ExecutorProvider(conf.getNumListenerThreads(), 
"pulsar-external-listener");
+                    
PulsarClientResourcesConfigurer.createExternalExecutorProvider(conf);
             this.internalExecutorProvider = internalExecutorProvider != null ? 
internalExecutorProvider :
-                    new ExecutorProvider(conf.getNumIoThreads(), 
"pulsar-client-internal");
+                    
PulsarClientResourcesConfigurer.createInternalExecutorProvider(conf);
             this.lookupExecutorProvider = lookupExecutorProvider != null ? 
lookupExecutorProvider :
-                    new ExecutorProvider(1, "pulsar-client-lookup");
+                    
PulsarClientResourcesConfigurer.createLookupExecutorProvider();
+            if (timer == null) {
+                this.timer = PulsarClientResourcesConfigurer.createTimer();
+                needStopTimer = true;
+            } else {
+                this.timer = timer;
+            }
             if (conf.getServiceUrl().startsWith("http")) {
-                lookup = new HttpLookupService(instrumentProvider, conf, 
this.eventLoopGroup);
+                lookup = new HttpLookupService(instrumentProvider, conf, 
this.eventLoopGroup, this.timer,
+                        getNameResolver());
             } else {
                 lookup = new BinaryProtoLookupService(this, 
conf.getServiceUrl(), conf.getListenerName(),
                         conf.isUseTls(), 
this.scheduledExecutorProvider.getExecutor(),
                         this.lookupExecutorProvider.getExecutor());
             }
-            if (timer == null) {
-                this.timer = new 
HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
-                needStopTimer = true;
-            } else {
-                this.timer = timer;
-            }
 
             if (conf.getServiceUrlProvider() != null) {
                 conf.getServiceUrlProvider().initialize(this);
@@ -1211,7 +1217,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     public LookupService createLookup(String url) throws PulsarClientException 
{
         if (url.startsWith("http")) {
-            return new HttpLookupService(instrumentProvider, conf, 
eventLoopGroup);
+            return new HttpLookupService(instrumentProvider, conf, 
eventLoopGroup, timer, getNameResolver());
         } else {
             return new BinaryProtoLookupService(this, url, 
conf.getListenerName(), conf.isUseTls(),
                     externalExecutorProvider.getExecutor());
@@ -1299,15 +1305,6 @@ public class PulsarClientImpl implements PulsarClient {
         });
     }
 
-    private static EventLoopGroup getEventLoopGroup(ClientConfigurationData 
conf) {
-        ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
-        return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), 
conf.isEnableBusyWait(), threadFactory);
-    }
-
-    private static ThreadFactory getThreadFactory(String poolName) {
-        return new ExecutorProvider.ExtendedThreadFactory(poolName, 
Thread.currentThread().isDaemon());
-    }
-
     void cleanupProducer(ProducerBase<?> producer) {
         producers.remove(producer);
     }
@@ -1415,4 +1412,8 @@ public class PulsarClientImpl implements PulsarClient {
     public TransactionBuilder newTransaction() {
         return new TransactionBuilderImpl(this, tcClient);
     }
+
+    NameResolver<InetAddress> getNameResolver() {
+        return DnsResolverUtil.adaptToNameResolver(addressResolver);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 6fe95140967..0351477985f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessagePayloadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResourcesBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -403,4 +404,9 @@ public final class PulsarClientImplementationBindingImpl 
implements PulsarClient
         }
         return new TopicMessageIdImpl(topic, messageIdAdv);
     }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder newSharedResourcesBuilder() {
+        return new PulsarClientSharedResourcesBuilderImpl();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
new file mode 100644
index 00000000000..28f97e92159
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+
+class PulsarClientResourcesConfigurer {
+    static final String NAME_TIMER = "pulsar-timer";
+    static final String POOL_NAME_LOOKUP_EXECUTOR = "pulsar-client-lookup";
+    static final String POOL_NAME_INTERNAL_EXECUTOR = "pulsar-client-internal";
+    static final String POOL_NAME_LISTENER_EXECUTOR = 
"pulsar-external-listener";
+    static final String POOL_NAME_SCHEDULED_EXECUTOR = 
"pulsar-client-scheduled";
+    static final String POOL_NAME_EVENT_LOOP_GROUP = "pulsar-client-io";
+    static final int LOOKUP_EXECUTOR_NUM_THREADS = 1;
+
+    static HashedWheelTimer createTimer() {
+        return createTimerInternal(NAME_TIMER, 1L, TimeUnit.MILLISECONDS);
+    }
+
+    private static HashedWheelTimer createTimerInternal(String poolName, long 
tickDuration, TimeUnit timeUnit) {
+        return new HashedWheelTimer(createThreadFactory(poolName), 
tickDuration, timeUnit);
+    }
+
+    static HashedWheelTimer 
createTimer(PulsarClientSharedResourcesBuilderImpl.TimerResourceConfig 
resourceConfig) {
+        if (resourceConfig == null) {
+            resourceConfig = new 
PulsarClientSharedResourcesBuilderImpl.TimerResourceConfig();
+        }
+        return createTimerInternal(resourceConfig.name, 
resourceConfig.tickDuration,
+                resourceConfig.tickDurationTimeUnit);
+    }
+
+    static ExecutorProvider createLookupExecutorProvider() {
+        return createExecutorProviderInternal(LOOKUP_EXECUTOR_NUM_THREADS, 
POOL_NAME_LOOKUP_EXECUTOR);
+    }
+
+    static ExecutorProvider createLookupExecutorProviderWithResourceConfig(
+            PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig 
resourceConfig) {
+        return createExecutorProviderInternal(resourceConfig, 
POOL_NAME_LOOKUP_EXECUTOR);
+    }
+
+    private static ExecutorProvider createExecutorProviderInternal(int 
numThreads, String poolName) {
+        return createExecutorProviderInternal(numThreads, poolName, 
Thread.currentThread().isDaemon());
+    }
+
+    private static ExecutorProvider createExecutorProviderInternal(int 
numThreads, String poolName, boolean daemon) {
+        return new ExecutorProvider(numThreads, poolName, daemon);
+    }
+
+    static ExecutorProvider 
createInternalExecutorProvider(ClientConfigurationData conf) {
+        return createExecutorProviderInternal(conf.getNumIoThreads(), 
POOL_NAME_INTERNAL_EXECUTOR);
+    }
+
+    static ExecutorProvider createInternalExecutorProviderWithResourceConfig(
+            PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig 
resourceConfig) {
+        return createExecutorProviderInternal(resourceConfig, 
POOL_NAME_INTERNAL_EXECUTOR);
+    }
+
+    static ExecutorProvider 
createExternalExecutorProvider(ClientConfigurationData conf) {
+        return createExecutorProviderInternal(conf.getNumListenerThreads(), 
POOL_NAME_LISTENER_EXECUTOR);
+    }
+
+    static ExecutorProvider createExternalExecutorProviderWithResourceConfig(
+            PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig 
resourceConfig) {
+        return createExecutorProviderInternal(resourceConfig, 
POOL_NAME_LISTENER_EXECUTOR);
+    }
+
+    private static ExecutorProvider createExecutorProviderInternal(
+            PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig 
resourceConfig, String defaultPoolName) {
+        if (resourceConfig == null) {
+            resourceConfig = new 
PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig();
+        }
+        String poolName = StringUtils.isNotBlank(resourceConfig.name) ? 
resourceConfig.name : defaultPoolName;
+        return createExecutorProviderInternal(resourceConfig.numberOfThreads, 
poolName, resourceConfig.daemon);
+    }
+
+    static ScheduledExecutorProvider 
createScheduledExecutorProvider(ClientConfigurationData conf) {
+        return new ScheduledExecutorProvider(conf.getNumIoThreads(), 
POOL_NAME_SCHEDULED_EXECUTOR);
+    }
+
+    static ScheduledExecutorProvider 
createScheduledExecutorProviderWithResourceConfig(
+            PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig 
resourceConfig) {
+        if (resourceConfig == null) {
+            resourceConfig = new 
PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig();
+        }
+        String poolName =
+                StringUtils.isNotBlank(resourceConfig.name) ? 
resourceConfig.name : POOL_NAME_SCHEDULED_EXECUTOR;
+        return new ScheduledExecutorProvider(resourceConfig.numberOfThreads, 
poolName,  resourceConfig.daemon);
+    }
+
+    static EventLoopGroup createEventLoopGroup(ClientConfigurationData conf) {
+        ThreadFactory threadFactory = 
createThreadFactory(POOL_NAME_EVENT_LOOP_GROUP);
+        return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), 
conf.isEnableBusyWait(), threadFactory);
+    }
+
+    static EventLoopGroup createEventLoopGroupWithResourceConfig(
+            PulsarClientSharedResourcesBuilderImpl.EventLoopResourceConfig
+                    eventLoopResourceConfig) {
+        if (eventLoopResourceConfig == null) {
+            eventLoopResourceConfig = new 
PulsarClientSharedResourcesBuilderImpl.EventLoopResourceConfig();
+        }
+        ThreadFactory threadFactory = 
createThreadFactory(eventLoopResourceConfig.name, 
eventLoopResourceConfig.daemon);
+        return 
EventLoopUtil.newEventLoopGroup(eventLoopResourceConfig.numberOfThreads,
+                eventLoopResourceConfig.enableBusyWait, threadFactory);
+    }
+
+    static ThreadFactory createThreadFactory(String poolName) {
+        return createThreadFactory(poolName, 
Thread.currentThread().isDaemon());
+    }
+
+    static ThreadFactory createThreadFactory(String poolName, boolean daemon) {
+        return new ExecutorProvider.ExtendedThreadFactory(poolName, daemon);
+    }
+
+    static DnsResolverGroupImpl createDnsResolverGroup(ClientConfigurationData 
conf) {
+        return new DnsResolverGroupImpl(conf);
+    }
+
+    static DnsResolverGroupImpl createDnsResolverGroupWithResourceConfig(
+            PulsarClientSharedResourcesBuilderImpl.DnsResolverResourceConfig 
resourceConfig) {
+        if (resourceConfig == null) {
+            resourceConfig = new 
PulsarClientSharedResourcesBuilderImpl.DnsResolverResourceConfig();
+        }
+        return new DnsResolverGroupImpl(resourceConfig);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
new file mode 100644
index 00000000000..fe9d43dc41e
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Lists;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.pulsar.client.api.DnsResolverConfig;
+import org.apache.pulsar.client.api.EventLoopGroupConfig;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.api.PulsarClientSharedResourcesBuilder;
+import org.apache.pulsar.client.api.ThreadPoolConfig;
+import org.apache.pulsar.client.api.TimerConfig;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+
+public class PulsarClientSharedResourcesBuilderImpl implements 
PulsarClientSharedResourcesBuilder {
+    Set<PulsarClientSharedResources.SharedResource> sharedResources = new 
HashSet<>();
+    Map<PulsarClientSharedResources.SharedResource, ResourceConfig> 
resourceConfigs = new HashMap<>();
+    private boolean shareConfigured;
+
+    interface ResourceConfig {
+
+    }
+
+    abstract static class NamedResourceConfig<T> implements ResourceConfig {
+        String name;
+
+        public T name(String name) {
+            this.name = name;
+            return self();
+        }
+
+        @SuppressWarnings("unchecked")
+        T self() {
+            return (T) this;
+        }
+    }
+
+    static class ThreadPoolResourceConfig extends 
NamedResourceConfig<ThreadPoolConfig> implements ThreadPoolConfig {
+        int numberOfThreads = Runtime.getRuntime().availableProcessors();
+        boolean daemon = Thread.currentThread().isDaemon();
+
+        @Override
+        public ThreadPoolConfig numberOfThreads(int numberOfThreads) {
+            this.numberOfThreads = numberOfThreads;
+            return this;
+        }
+
+        @Override
+        public ThreadPoolConfig daemon(boolean daemon) {
+            this.daemon = daemon;
+            return this;
+        }
+    }
+
+    static class EventLoopResourceConfig extends 
NamedResourceConfig<EventLoopGroupConfig>
+            implements EventLoopGroupConfig {
+        int numberOfThreads = Runtime.getRuntime().availableProcessors();
+        boolean daemon = Thread.currentThread().isDaemon();
+        boolean enableBusyWait;
+
+        EventLoopResourceConfig() {
+            name = PulsarClientResourcesConfigurer.POOL_NAME_EVENT_LOOP_GROUP;
+        }
+
+        @Override
+        public EventLoopGroupConfig numberOfThreads(int numberOfThreads) {
+            this.numberOfThreads = numberOfThreads;
+            return this;
+        }
+
+        @Override
+        public EventLoopGroupConfig daemon(boolean daemon) {
+            this.daemon = daemon;
+            return this;
+        }
+
+        @Override
+        public EventLoopGroupConfig enableBusyWait(boolean enableBusyWait) {
+            this.enableBusyWait = enableBusyWait;
+            return this;
+        }
+    }
+
+    static class TimerResourceConfig extends NamedResourceConfig<TimerConfig> 
implements TimerConfig {
+        long tickDuration = 1L;
+        TimeUnit tickDurationTimeUnit = TimeUnit.MILLISECONDS;
+
+        TimerResourceConfig() {
+            name = PulsarClientResourcesConfigurer.NAME_TIMER;
+        }
+
+        @Override
+        public TimerConfig tickDuration(long tickDuration, TimeUnit timeUnit) {
+            this.tickDuration = tickDuration;
+            this.tickDurationTimeUnit = timeUnit;
+            return this;
+        }
+    }
+
+    static class DnsResolverResourceConfig implements ResourceConfig, 
DnsResolverConfig {
+        InetSocketAddress localAddress;
+        Collection<InetSocketAddress> serverAddresses;
+        int minTtl = DnsResolverUtil.getDefaultMinTTL();
+        int maxTtl = DnsResolverUtil.getDefaultTTL();
+        int negativeTtl = DnsResolverUtil.getDefaultNegativeTTL();
+        long queryTimeoutMillis = -1L;
+        boolean traceEnabled = true;
+        boolean tcpFallbackEnabled = true;
+        boolean tcpFallbackOnTimeoutEnabled = true;
+        int ndots = -1;
+        Collection<String> searchDomains;
+
+        @Override
+        public DnsResolverConfig localAddress(InetSocketAddress localAddress) {
+            this.localAddress = localAddress;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig serverAddresses(Iterable<InetSocketAddress> 
addresses) {
+            this.serverAddresses = Lists.newArrayList(addresses);
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig minTtl(int minTtl) {
+            this.minTtl = minTtl;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig maxTtl(int maxTtl) {
+            this.maxTtl = maxTtl;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig negativeTtl(int negativeTtl) {
+            this.negativeTtl = negativeTtl;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig queryTimeoutMillis(long queryTimeoutMillis) {
+            this.queryTimeoutMillis = queryTimeoutMillis;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig traceEnabled(boolean traceEnabled) {
+            this.traceEnabled = traceEnabled;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig tcpFallbackEnabled(boolean 
tcpFallbackEnabled) {
+            this.tcpFallbackEnabled = tcpFallbackEnabled;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig tcpFallbackOnTimeoutEnabled(boolean 
tcpFallbackOnTimeoutEnabled) {
+            this.tcpFallbackOnTimeoutEnabled = tcpFallbackOnTimeoutEnabled;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig ndots(int ndots) {
+            this.ndots = ndots;
+            return this;
+        }
+
+        @Override
+        public DnsResolverConfig searchDomains(Iterable<String> searchDomains) 
{
+            this.searchDomains = Lists.newArrayList(searchDomains);
+            return this;
+        }
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder resourceTypes(
+            PulsarClientSharedResources.SharedResource... sharedResource) {
+        if (shareConfigured) {
+            throw new IllegalStateException("Cannot set resourceTypes when 
shareConfigured() has already been called");
+        }
+        return resourceTypes(List.of(sharedResource));
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder resourceTypes(
+            Collection<PulsarClientSharedResources.SharedResource> 
sharedResource) {
+        if (shareConfigured) {
+            throw new IllegalStateException("Cannot set resourceTypes when 
shareConfigured() has already been called");
+        }
+        sharedResources.addAll(sharedResource);
+        return this;
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder shareConfigured() {
+        if (!sharedResources.isEmpty()) {
+            throw new IllegalStateException("Cannot use shareConfigured() when 
resourceTypes has already been set");
+        }
+        shareConfigured = true;
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T extends ResourceConfig> T 
getOrCreateConfig(PulsarClientSharedResources.SharedResource sharedResource) {
+        return (T) resourceConfigs.computeIfAbsent(sharedResource, k -> {
+            switch (sharedResource.getType()) {
+                case EventLoopGroup:
+                    return new EventLoopResourceConfig();
+                case DnsResolver:
+                    return new DnsResolverResourceConfig();
+                case ThreadPool:
+                    return new ThreadPoolResourceConfig();
+                case Timer:
+                    return new TimerResourceConfig();
+                default:
+                    throw new IllegalArgumentException("Unknown resource type: 
" + sharedResource.getType());
+            }
+        });
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder configureThreadPool(
+            PulsarClientSharedResources.SharedResource sharedResource, 
Consumer<ThreadPoolConfig<?>> configurer) {
+        if (sharedResource.getType() != 
PulsarClientSharedResources.SharedResourceType.ThreadPool) {
+            throw new IllegalArgumentException("The shared resource " + 
sharedResource + " doesn't support thread pool"
+                    + " configuration");
+        }
+        configurer.accept(getOrCreateConfig(sharedResource));
+        return this;
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder 
configureEventLoop(Consumer<EventLoopGroupConfig> configurer) {
+        
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.EventLoopGroup));
+        return this;
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder 
configureDnsResolver(Consumer<DnsResolverConfig> configurer) {
+        
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.DnsResolver));
+        return this;
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder 
configureTimer(Consumer<TimerConfig> configurer) {
+        
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.Timer));
+        return this;
+    }
+
+    @Override
+    public PulsarClientSharedResources build() {
+        return new PulsarClientSharedResourcesImpl(sharedResources, 
resourceConfigs, shareConfigured);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
new file mode 100644
index 00000000000..29953784681
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createDnsResolverGroupWithResourceConfig;
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createEventLoopGroupWithResourceConfig;
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createExternalExecutorProviderWithResourceConfig;
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createInternalExecutorProviderWithResourceConfig;
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createLookupExecutorProviderWithResourceConfig;
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createScheduledExecutorProviderWithResourceConfig;
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createTimer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.Timer;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Getter;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+
+@Getter
+public class PulsarClientSharedResourcesImpl implements 
PulsarClientSharedResources {
+    Set<SharedResource> sharedResources;
+    protected final EventLoopGroup ioEventLoopGroup;
+    private final ExecutorProvider internalExecutorProvider;
+    private final ExecutorProvider externalExecutorProvider;
+    private final ScheduledExecutorProvider scheduledExecutorProvider;
+    private final Timer timer;
+    private final ExecutorProvider lookupExecutorProvider;
+    private final DnsResolverGroupImpl dnsResolverGroup;
+
+    public PulsarClientSharedResourcesImpl(Set<SharedResource> sharedResources,
+                                           Map<SharedResource, 
PulsarClientSharedResourcesBuilderImpl.ResourceConfig>
+                                                   resourceConfigs, boolean 
shareConfigured) {
+        if (shareConfigured) {
+            if (resourceConfigs.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "No resources have been configured while using 
shareConfigured mode");
+            }
+            this.sharedResources = Set.copyOf(resourceConfigs.keySet());
+        } else {
+            if (sharedResources.isEmpty()) {
+                this.sharedResources = EnumSet.allOf(SharedResource.class);
+            } else {
+                this.sharedResources = Set.copyOf(sharedResources);
+            }
+        }
+        this.ioEventLoopGroup = 
this.sharedResources.contains(SharedResource.EventLoopGroup)
+                ? createEventLoopGroupWithResourceConfig(
+                getResourceConfig(resourceConfigs, 
SharedResource.EventLoopGroup))
+                : null;
+        this.externalExecutorProvider = 
this.sharedResources.contains(SharedResource.ListenerExecutor)
+                ? createExternalExecutorProviderWithResourceConfig(
+                getResourceConfig(resourceConfigs, 
SharedResource.ListenerExecutor))
+                : null;
+        this.internalExecutorProvider = 
this.sharedResources.contains(SharedResource.InternalExecutor)
+                ? createInternalExecutorProviderWithResourceConfig(
+                getResourceConfig(resourceConfigs, 
SharedResource.InternalExecutor))
+                : null;
+        this.scheduledExecutorProvider = 
this.sharedResources.contains(SharedResource.ScheduledExecutor)
+                ? createScheduledExecutorProviderWithResourceConfig(
+                getResourceConfig(resourceConfigs, 
SharedResource.ScheduledExecutor))
+                : null;
+        this.lookupExecutorProvider = 
this.sharedResources.contains(SharedResource.LookupExecutor)
+                ? createLookupExecutorProviderWithResourceConfig(
+                getResourceConfig(resourceConfigs, 
SharedResource.LookupExecutor))
+                : null;
+        this.timer = this.sharedResources.contains(SharedResource.Timer)
+                ? createTimer(getResourceConfig(resourceConfigs, 
SharedResource.Timer))
+                : null;
+        this.dnsResolverGroup = 
this.sharedResources.contains(SharedResource.DnsResolver)
+                ? createDnsResolverGroupWithResourceConfig(
+                getResourceConfig(resourceConfigs, SharedResource.DnsResolver))
+                : null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T extends PulsarClientSharedResourcesBuilderImpl.ResourceConfig> 
T getResourceConfig(
+            Map<SharedResource, 
PulsarClientSharedResourcesBuilderImpl.ResourceConfig> resourceConfigs,
+            SharedResource resource) {
+        return (T) resourceConfigs.get(resource);
+    }
+
+    @Override
+    public boolean contains(SharedResource sharedResource) {
+        return sharedResources.contains(sharedResource);
+    }
+
+    public Set<SharedResource> getSharedResources() {
+        return sharedResources;
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        if (externalExecutorProvider != null) {
+            externalExecutorProvider.shutdownNow();
+        }
+        if (internalExecutorProvider != null) {
+            internalExecutorProvider.shutdownNow();
+        }
+        if (scheduledExecutorProvider != null) {
+            scheduledExecutorProvider.shutdownNow();
+        }
+        if (lookupExecutorProvider != null) {
+            lookupExecutorProvider.shutdownNow();
+        }
+        if (dnsResolverGroup != null) {
+            dnsResolverGroup.close();
+        }
+        if (timer != null) {
+            timer.stop();
+        }
+        if (ioEventLoopGroup != null) {
+            EventLoopUtil.shutdownGracefully(ioEventLoopGroup);
+        }
+    }
+
+    public void applyTo(PulsarClientImpl.PulsarClientImplBuilder 
instanceBuilder) {
+        if (externalExecutorProvider != null) {
+            instanceBuilder.externalExecutorProvider(externalExecutorProvider);
+        }
+        if (internalExecutorProvider != null) {
+            instanceBuilder.internalExecutorProvider(internalExecutorProvider);
+        }
+        if (scheduledExecutorProvider != null) {
+            
instanceBuilder.scheduledExecutorProvider(scheduledExecutorProvider);
+        }
+        if (lookupExecutorProvider != null) {
+            instanceBuilder.lookupExecutorProvider(lookupExecutorProvider);
+        }
+        if (dnsResolverGroup != null) {
+            instanceBuilder.dnsResolverGroup(dnsResolverGroup);
+        }
+        if (timer != null) {
+            instanceBuilder.timer(timer);
+        }
+        if (ioEventLoopGroup != null) {
+            instanceBuilder.eventLoopGroup(ioEventLoopGroup);
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 88654c51300..ae3b532a054 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -61,13 +61,17 @@ public class ExecutorProvider {
     }
 
     public ExecutorProvider(int numThreads, String poolName) {
+        this(numThreads, poolName, Thread.currentThread().isDaemon());
+    }
+
+    public ExecutorProvider(int numThreads, String poolName, boolean daemon) {
         checkArgument(numThreads > 0);
         this.numThreads = numThreads;
         Objects.requireNonNull(poolName);
         executors = new ArrayList<>(numThreads);
         for (int i = 0; i < numThreads; i++) {
             ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
-                    poolName, Thread.currentThread().isDaemon());
+                    poolName, daemon);
             ExecutorService executor = createExecutor(threadFactory);
             executors.add(Pair.of(executor, threadFactory));
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
index 3ebd41ebe56..b47659e514a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
@@ -24,6 +24,9 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class ScheduledExecutorProvider extends ExecutorProvider {
+    public ScheduledExecutorProvider(int numThreads, String poolName, boolean 
daemon) {
+        super(numThreads, poolName, daemon);
+    }
 
     public ScheduledExecutorProvider(int numThreads, String poolName) {
         super(numThreads, poolName);
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml 
b/pulsar-client/src/main/resources/findbugsExclude.xml
index f7cf6b9cfd5..802c03622b0 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -257,6 +257,10 @@
         <Method name="&lt;init&gt;"/>
         <Bug pattern="EI_EXPOSE_REP2"/>
     </Match>
+    <Match>
+      <Class 
name="org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl"/>
+      <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
     <Match>
         <Class name="org.apache.pulsar.client.impl.ClientCnx"/>
         <Method name="getPendingRequests"/>
@@ -712,6 +716,11 @@
         <Method name="&lt;init&gt;"/>
         <Bug pattern="EI_EXPOSE_REP2"/>
     </Match>
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.PulsarClientImpl"/>
+        <Method name="&lt;init&gt;"/>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
     <Match>
         <Class name="org.apache.pulsar.client.impl.PulsarClientImpl"/>
         <Method name="getConfiguration"/>
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index 265af6dd23c..ca4aca6c329 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -54,6 +54,12 @@ public class ControlledClusterFailoverTest {
             .build();
 
         ControlledClusterFailover controlledClusterFailover = 
(ControlledClusterFailover) provider;
+
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
+        controlledClusterFailover.initialize(pulsarClient);
+
         Request request = 
controlledClusterFailover.getRequestBuilder().build();
 
         Assert.assertTrue(provider instanceof ControlledClusterFailover);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImplTest.java
new file mode 100644
index 00000000000..f01906911a6
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImplTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.testng.annotations.Test;
+
+public class PulsarClientSharedResourcesBuilderImplTest {
+    @Test
+    public void testSharedResources() throws PulsarClientException {
+        
runClientsWithSharedResources(PulsarClientSharedResources.builder().build(), 
1000, false);
+        
runClientsWithSharedResources(PulsarClientSharedResources.builder().build(), 
1000, true);
+    }
+
+    private static void 
runClientsWithSharedResources(PulsarClientSharedResources sharedResources, int 
numberOfClients,
+                                                      boolean useHttpLookup)
+            throws PulsarClientException {
+        List<PulsarClient> clients = new ArrayList<>();
+        for (int i = 0; i < numberOfClients; i++) {
+            clients.add(PulsarClient.builder()
+                    .serviceUrl(useHttpLookup ? "http://localhost:8080"; : 
"pulsar://localhost:6650")
+                    .sharedResources(sharedResources)
+                    .build());
+        }
+        for (PulsarClient client : clients) {
+            client.close();
+        }
+        sharedResources.close();
+    }
+
+    @Test
+    public void testPulsarClientSharedResourcesBuilderUsingPublicAPI() throws 
PulsarClientException {
+        PulsarClientSharedResources sharedResources = 
PulsarClientSharedResources.builder()
+                .configureEventLoop(eventLoopGroupConfig -> {
+                    eventLoopGroupConfig
+                            .name("testEventLoop")
+                            .numberOfThreads(10);
+                })
+                .configureDnsResolver(dnsResolverConfig -> {
+                    dnsResolverConfig.localAddress(new InetSocketAddress(0));
+                })
+                
.configureThreadPool(PulsarClientSharedResources.SharedResource.ListenerExecutor,
+                        threadPoolConfig -> {
+                            threadPoolConfig.name("testListenerThreadPool")
+                                    .daemon(true)
+                                    .numberOfThreads(12);
+                        }
+                )
+                
.configureThreadPool(PulsarClientSharedResources.SharedResource.InternalExecutor,
+                        threadPoolConfig -> {
+                            threadPoolConfig.name("testInternalThreadPool")
+                                    .daemon(true)
+                                    .numberOfThreads(2);
+                        }
+                )
+                
.configureThreadPool(PulsarClientSharedResources.SharedResource.LookupExecutor,
+                        threadPoolConfig -> {
+                            threadPoolConfig.name("testLookupThreadPool")
+                                    .daemon(true)
+                                    .numberOfThreads(1);
+                        }
+                )
+                
.configureThreadPool(PulsarClientSharedResources.SharedResource.ScheduledExecutor,
+                        threadPoolConfig -> {
+                            threadPoolConfig.name("testSchedulerThreadPool")
+                                    .daemon(true)
+                                    .numberOfThreads(1);
+                        }
+                )
+                .configureTimer(timerConfig -> {
+                    timerConfig.name("testTimer").tickDuration(100, 
TimeUnit.MILLISECONDS);
+                })
+                .build();
+        runClientsWithSharedResources(sharedResources, 1000, false);
+        sharedResources.close();
+    }
+
+    @Test
+    public void testPartialSharing() throws PulsarClientException {
+        PulsarClientSharedResources sharedResources =
+                PulsarClientSharedResources.builder()
+                        .shareConfigured()
+                        .configureEventLoop(eventLoopGroupConfig -> {
+                    
eventLoopGroupConfig.name("testEventLoop").numberOfThreads(10);
+                }).configureDnsResolver(dnsResolverConfig -> {
+                    dnsResolverConfig.localAddress(new InetSocketAddress(0));
+                }).build();
+        runClientsWithSharedResources(sharedResources, 2, false);
+        sharedResources.close();
+    }
+
+    @Test
+    public void testDnsResolverConfig() throws PulsarClientException {
+        PulsarClientSharedResources sharedResources =
+                PulsarClientSharedResources.builder()
+                        .shareConfigured()
+                        .configureDnsResolver(dnsResolverConfig -> {
+                            dnsResolverConfig
+                                    .localAddress(new InetSocketAddress(0))
+                                    .serverAddresses(List.of(new 
InetSocketAddress("8.8.8.8", 53)))
+                                    .minTtl(0)
+                                    .maxTtl(45)
+                                    .negativeTtl(10)
+                                    .ndots(1)
+                                    .searchDomains(List.of("mycompany.com"))
+                                    .queryTimeoutMillis(5000L)
+                                    .tcpFallbackEnabled(true)
+                                    .tcpFallbackOnTimeoutEnabled(false)
+                                    .traceEnabled(false);
+                        }).build();
+        runClientsWithSharedResources(sharedResources, 2, false);
+        sharedResources.close();
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
index bcff83acd94..92c025092d3 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -18,10 +18,24 @@
  */
 package org.apache.pulsar.common.util.netty;
 
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.DefaultNameResolver;
+import io.netty.resolver.InetNameResolver;
+import io.netty.resolver.InetSocketAddressResolver;
+import io.netty.resolver.NameResolver;
 import io.netty.resolver.dns.DnsNameResolverBuilder;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.Promise;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.security.Security;
+import java.util.List;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
 
 @Slf4j
 public class DnsResolverUtil {
@@ -61,8 +75,15 @@ public class DnsResolverUtil {
                     .map(Integer::decode)
                     .filter(i -> i > 0)
                     .orElseGet(() -> {
-                        if (System.getSecurityManager() == null) {
-                            return JDK_DEFAULT_TTL;
+                        try {
+                            if (System.getSecurityManager() == null) {
+                                return JDK_DEFAULT_TTL;
+                            }
+                        } catch (Throwable t) {
+                            log.warn("Cannot use current logic to resolve JDK 
default DNS TTL settings. Use "
+                                            + "sun.net.inetaddr.ttl and 
sun.net.inetaddr.negative.ttl system "
+                                            + "properties for setting default 
values for DNS TTL settings. {}",
+                                    t.getMessage());
                         }
                         return DEFAULT_TTL;
                     });
@@ -92,4 +113,97 @@ public class DnsResolverUtil {
         dnsNameResolverBuilder.ttl(MIN_TTL, TTL);
         dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL);
     }
+
+    public static int getDefaultMinTTL() {
+        return MIN_TTL;
+    }
+
+    public static int getDefaultTTL() {
+        return TTL;
+    }
+
+    public static int getDefaultNegativeTTL() {
+        return NEGATIVE_TTL;
+    }
+
+    /**
+     * Extract the underlying Netty NameResolver from an AddressResolver 
instance or creates an adapter as the
+     * fallback. If null is passed in a default Netty NameResolver will be 
returned which delegates to the
+     * blocking JDK DNS resolver.
+     *
+     * @param addressResolver Netty AddressResolver instance or null
+     * @return Netty NameResolver instance
+     */
+    @SuppressWarnings("unchecked")
+    public static NameResolver<InetAddress> 
adaptToNameResolver(AddressResolver<InetSocketAddress> addressResolver) {
+        if (addressResolver == null) {
+            return new DefaultNameResolver(ImmediateEventExecutor.INSTANCE);
+        }
+        // Use reflection to extract underlying Netty NameResolver instance.
+        if (InetSocketAddressResolver.class.isInstance(addressResolver)) {
+            try {
+                Field nameResolverField =
+                        
FieldUtils.getDeclaredField(InetSocketAddressResolver.class, "nameResolver", 
true);
+                if (nameResolverField != null) {
+                    return (NameResolver<InetAddress>) 
FieldUtils.readField(nameResolverField, addressResolver);
+                } else {
+                    log.warn("Could not find nameResolver Field in 
InetSocketAddressResolver instance.");
+                }
+            } catch (Throwable t) {
+                log.warn("Failed to extract NameResolver from 
InetSocketAddressResolver instance. {}", t.getMessage());
+            }
+        }
+        // fallback to use an adapter if reflection fails
+        log.info("Creating NameResolver adapter that wraps an AddressResolver 
instance.");
+        return createNameResolverAdapter(addressResolver, 
ImmediateEventExecutor.INSTANCE);
+    }
+
+    /**
+     * Creates a NameResolver adapter that wraps an AddressResolver instance.
+     * <p>
+     * This adapter is necessary because Netty doesn't provide a direct 
implementation for converting
+     * between AddressResolver and NameResolver, while AsyncHttpClient 
specifically requires a NameResolver.
+     * The adapter handles the resolution of hostnames to IP addresses by 
delegating to the underlying
+     * AddressResolver.
+     *
+     * @param addressResolver the AddressResolver instance to adapt, handling 
InetSocketAddress resolution
+     * @param executor        the EventExecutor to be used for executing 
resolution tasks
+     * @return a NameResolver instance that wraps the provided AddressResolver
+     */
+    static NameResolver<InetAddress> createNameResolverAdapter(
+            AddressResolver<InetSocketAddress> addressResolver, EventExecutor 
executor) {
+        return new InetNameResolver(executor) {
+            @Override
+            protected void doResolve(String inetHost, Promise<InetAddress> 
promise) throws Exception {
+                Promise<InetSocketAddress> delegatedPromise = 
executor().newPromise();
+                
addressResolver.resolve(InetSocketAddress.createUnresolved(inetHost, 1), 
delegatedPromise);
+                delegatedPromise.addListener(new 
GenericFutureListener<Promise<InetSocketAddress>>() {
+                    @Override
+                    public void operationComplete(Promise<InetSocketAddress> 
future) throws Exception {
+                        if (future.isSuccess()) {
+                            promise.setSuccess(future.get().getAddress());
+                        } else {
+                            promise.setFailure(future.cause());
+                        }
+                    }
+                });
+            }
+
+            @Override
+            protected void doResolveAll(String inetHost, 
Promise<List<InetAddress>> promise) throws Exception {
+                Promise<List<InetSocketAddress>> delegatedPromise = 
executor().newPromise();
+                
addressResolver.resolveAll(InetSocketAddress.createUnresolved(inetHost, 1), 
delegatedPromise);
+                delegatedPromise.addListener(new 
GenericFutureListener<Promise<List<InetSocketAddress>>>() {
+                    @Override
+                    public void 
operationComplete(Promise<List<InetSocketAddress>> future) throws Exception {
+                        if (future.isSuccess()) {
+                            
promise.setSuccess(future.get().stream().map(InetSocketAddress::getAddress).toList());
+                        } else {
+                            promise.setFailure(future.cause());
+                        }
+                    }
+                });
+            }
+        };
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
index cfc766890f0..501fe53f3a3 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
@@ -57,13 +57,7 @@ public class EventLoopUtil {
      */
     public static EventLoopGroup newEventLoopGroup(int nThreads, boolean 
enableBusyWait, ThreadFactory threadFactory) {
         if (Epoll.isAvailable()) {
-            String enableIoUring = System.getProperty(ENABLE_IO_URING);
-
-            // By default, io_uring will not be enabled, even if available. 
The environment variable will be used:
-            // enable.io_uring=1
-            if (StringUtils.equalsAnyIgnoreCase(enableIoUring, "1", "true")) {
-                // Throw exception if IOUring cannot be used
-                IOUring.ensureAvailability();
+            if (isIoUringEnabledAndAvailable()) {
                 return new IOUringEventLoopGroup(nThreads, threadFactory);
             } else {
                 if (!enableBusyWait) {
@@ -96,6 +90,17 @@ public class EventLoopUtil {
         }
     }
 
+    private static boolean isIoUringEnabledAndAvailable() {
+        // By default, io_uring will not be enabled, even if available. The 
environment variable will be used:
+        // enable.io_uring=1
+        boolean ioUringEnabled = 
StringUtils.equalsAnyIgnoreCase(System.getProperty(ENABLE_IO_URING), "1", 
"true");
+        if (ioUringEnabled) {
+            // Throw exception if IOUring cannot be used
+            IOUring.ensureAvailability();
+        }
+        return ioUringEnabled;
+    }
+
     /**
      * Return a SocketChannel class suitable for the given EventLoopGroup 
implementation.
      *
@@ -112,6 +117,25 @@ public class EventLoopUtil {
         }
     }
 
+    /**
+     * Returns the most appropriate SocketChannel implementation based on the 
system's capabilities
+     * and configuration. Precedence:
+     * 1) IO_uring (if available and enabled via 'pulsar.enableUring')
+     * 2) Epoll (if available)
+     * 3) NIO (fallback)
+     */
+    public static Class<? extends SocketChannel> getClientSocketChannelClass() 
{
+        if (Epoll.isAvailable()) {
+            if (isIoUringEnabledAndAvailable()) {
+                return IOUringSocketChannel.class;
+            } else {
+                return EpollSocketChannel.class;
+            }
+        } else {
+            return NioSocketChannel.class;
+        }
+    }
+
     public static Class<? extends ServerSocketChannel> 
getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
         if (eventLoopGroup instanceof IOUringEventLoopGroup) {
             return IOUringServerSocketChannel.class;
@@ -122,6 +146,25 @@ public class EventLoopUtil {
         }
     }
 
+    /**
+     * Returns the most appropriate ServerSocketChannel implementation based 
on the system's capabilities
+     * and configuration. Precedence:
+     * 1) IO_uring (if available and enabled via 'pulsar.enableUring')
+     * 2) Epoll (if available)
+     * 3) NIO (fallback)
+     */
+    public static Class<? extends ServerSocketChannel> 
getServerSocketChannelClass() {
+        if (Epoll.isAvailable()) {
+            if (isIoUringEnabledAndAvailable()) {
+                return IOUringServerSocketChannel.class;
+            } else {
+                return EpollServerSocketChannel.class;
+            }
+        } else {
+            return NioServerSocketChannel.class;
+        }
+    }
+
     public static Class<? extends DatagramChannel> 
getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
         if (eventLoopGroup instanceof IOUringEventLoopGroup) {
             return IOUringDatagramChannel.class;
@@ -132,6 +175,25 @@ public class EventLoopUtil {
         }
     }
 
+    /**
+     * Returns the most appropriate DatagramChannel implementation based on 
the system's capabilities
+     * and configuration. Precedence:
+     * 1) IO_uring (if available and enabled via 'pulsar.enableUring')
+     * 2) Epoll (if available)
+     * 3) NIO (fallback)
+     */
+    public static Class<? extends DatagramChannel> getDatagramChannelClass() {
+        if (Epoll.isAvailable()) {
+            if (isIoUringEnabledAndAvailable()) {
+                return IOUringDatagramChannel.class;
+            } else {
+                return EpollDatagramChannel.class;
+            }
+        } else {
+            return NioDatagramChannel.class;
+        }
+    }
+
     public static void enableTriggeredMode(ServerBootstrap bootstrap) {
         if (Epoll.isAvailable()) {
             bootstrap.childOption(EpollChannelOption.EPOLL_MODE, 
EpollMode.LEVEL_TRIGGERED);
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
index 46599cc45a0..62b59f89368 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
@@ -31,6 +31,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+// This test needs to be run separately since it relies on static field values
 public class DnsResolverTest {
     private static final int MIN_TTL = 0;
     private static final int TTL = 101;
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverUtilTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverUtilTest.java
new file mode 100644
index 00000000000..beb68be6ed3
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverUtilTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import static org.testng.Assert.assertEquals;
+import com.google.common.net.InetAddresses;
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.DefaultNameResolver;
+import io.netty.resolver.NameResolver;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.testng.annotations.Test;
+
+public class DnsResolverUtilTest {
+
+    @Test
+    public void testNameResolverAdapter() throws ExecutionException, 
InterruptedException {
+        ImmediateEventExecutor executor = ImmediateEventExecutor.INSTANCE;
+        AddressResolver<InetSocketAddress> addressResolver =
+                new DefaultNameResolver(executor).asAddressResolver();
+        NameResolver<InetAddress> nameResolverAdapter =
+                DnsResolverUtil.createNameResolverAdapter(addressResolver, 
executor);
+        testNameResolver(nameResolverAdapter);
+    }
+
+    @Test
+    public void testNameResolverExtraction() throws ExecutionException, 
InterruptedException {
+        ImmediateEventExecutor executor = ImmediateEventExecutor.INSTANCE;
+        AddressResolver<InetSocketAddress> addressResolver =
+                new DefaultNameResolver(executor).asAddressResolver();
+        NameResolver<InetAddress> nameResolverAdapter =
+                DnsResolverUtil.adaptToNameResolver(addressResolver);
+        testNameResolver(nameResolverAdapter);
+    }
+
+    @Test
+    public void testNameResolverNullFallback() throws ExecutionException, 
InterruptedException {
+        NameResolver<InetAddress> nameResolverAdapter = 
DnsResolverUtil.adaptToNameResolver(null);
+        testNameResolver(nameResolverAdapter);
+    }
+
+    private static void testNameResolver(NameResolver<InetAddress> 
nameResolver)
+            throws InterruptedException, ExecutionException {
+        InetAddress inetAddress = nameResolver.resolve("8.8.8.8").get();
+        assertEquals(inetAddress, InetAddresses.forString("8.8.8.8"));
+        List<InetAddress> inetAddresses = 
nameResolver.resolveAll("8.8.8.8").get();
+        assertEquals(inetAddresses.get(0), InetAddresses.forString("8.8.8.8"));
+    }
+}
\ No newline at end of file

Reply via email to