This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2f9dcd8e3f730d3a80e437ab595349d584d01d62 Author: Lari Hotari <[email protected]> AuthorDate: Fri Sep 26 01:04:28 2025 +0300 [improve][client/broker] Add DnsResolverGroup to share DNS cache across multiple PulsarClient instances (#24784) (cherry picked from commit 1050f48035c97a14094b812467f046f6d37ff6f4) --- .../org/apache/pulsar/broker/PulsarService.java | 23 ++++-- .../pulsar/client/impl/ConnectionPoolTest.java | 4 +- .../apache/pulsar/client/impl/ConnectionPool.java | 52 ++++++------- .../pulsar/client/impl/DnsResolverGroupImpl.java | 86 ++++++++++++++++++++++ .../pulsar/client/impl/PulsarClientImpl.java | 51 ++++++++++--- .../pulsar/proxy/server/ProxyConnection.java | 2 +- 6 files changed, 171 insertions(+), 47 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5fd63502f8e..aa5c0b0ad2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -143,6 +143,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClient; +import org.apache.pulsar.client.impl.DnsResolverGroupImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; @@ -266,6 +267,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider; private final Timer brokerClientSharedTimer; private final ExecutorProvider brokerClientSharedLookupExecutorProvider; + private final DnsResolverGroupImpl brokerClientSharedDnsResolverGroup; private MetricsGenerator metricsGenerator; private final PulsarBrokerOpenTelemetry openTelemetry; @@ -398,6 +400,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); this.brokerClientSharedLookupExecutorProvider = new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor"); + this.brokerClientSharedDnsResolverGroup = + new DnsResolverGroupImpl(this.ioEventLoopGroup, + loadBrokerClientProperties(new ClientConfigurationData())); // here in the constructor we don't have the offloader scheduler yet this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0); @@ -697,6 +702,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { brokerClientSharedInternalExecutorProvider.shutdownNow(); brokerClientSharedScheduledExecutorProvider.shutdownNow(); brokerClientSharedLookupExecutorProvider.shutdownNow(); + brokerClientSharedDnsResolverGroup.close(); brokerClientSharedTimer.stop(); if (monotonicClock instanceof AutoCloseable c) { c.close(); @@ -1711,7 +1717,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider) - .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider); + .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider) + .dnsResolverGroup(brokerClientSharedDnsResolverGroup); if (customizer != null) { customizer.accept(pulsarClientImplBuilder); } @@ -1740,10 +1747,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized. // See https://github.com/apache/pulsar/issues/8509 for more information. - Map<String, Object> overrides = PropertiesUtils - .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_"); - ClientConfigurationData conf = - ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class); + ClientConfigurationData conf = loadBrokerClientProperties(initialConf); // Disabled auto release useless connections // The automatic release connection feature is not yet perfect for transaction scenarios, so turn it @@ -1789,6 +1793,15 @@ public class PulsarService implements AutoCloseable, ShutdownService { return conf; } + // load plain brokerClient_ properties without complete initialization + private ClientConfigurationData loadBrokerClientProperties(ClientConfigurationData initialConf) { + Map<String, Object> overrides = PropertiesUtils + .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_"); + ClientConfigurationData conf = + ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class); + return conf; + } + public synchronized PulsarAdmin getAdminClient() throws PulsarServerException { if (this.adminClient == null) { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java index 5cc04d63108..d739d93f1bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import io.netty.channel.EventLoopGroup; import io.netty.resolver.AbstractAddressResolver; +import io.netty.resolver.AddressResolver; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; @@ -260,7 +261,8 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop, (Supplier<ClientCnx>) () -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop), - Optional.of(resolver), scheduledExecutorService); + Optional.<Supplier<AddressResolver<InetSocketAddress>>>of(() -> resolver), + scheduledExecutorService); ClientCnx cnx = pool.getConnection( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 59baccfbb99..f8412b5bf29 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -27,9 +27,6 @@ import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.resolver.AddressResolver; -import io.netty.resolver.dns.DnsAddressResolverGroup; -import io.netty.resolver.dns.DnsNameResolverBuilder; -import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ScheduledFuture; import io.opentelemetry.api.common.Attributes; @@ -51,6 +48,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; +import lombok.Builder; +import lombok.NonNull; import lombok.Value; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; @@ -61,7 +60,6 @@ import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.metrics.Unit; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.netty.DnsResolverUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +78,7 @@ public class ConnectionPool implements AutoCloseable { private final boolean isSniProxy; protected final AddressResolver<InetSocketAddress> addressResolver; + private DnsResolverGroupImpl dnsResolverGroup; private final boolean shouldCloseDnsResolver; @@ -106,8 +105,7 @@ public class ConnectionPool implements AutoCloseable { public ConnectionPool(InstrumentProvider instrumentProvider, ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ScheduledExecutorService scheduledExecutorService) throws PulsarClientException { - this(instrumentProvider, conf, eventLoopGroup, () -> new ClientCnx(instrumentProvider, conf, eventLoopGroup), - scheduledExecutorService); + this(instrumentProvider, conf, eventLoopGroup, null, scheduledExecutorService); } public ConnectionPool(InstrumentProvider instrumentProvider, @@ -118,12 +116,16 @@ public class ConnectionPool implements AutoCloseable { scheduledExecutorService); } - public ConnectionPool(InstrumentProvider instrumentProvider, - ClientConfigurationData conf, EventLoopGroup eventLoopGroup, + @Builder(builderClassName = "ConnectionPoolBuilder") + public ConnectionPool(@NonNull InstrumentProvider instrumentProvider, + @NonNull ClientConfigurationData conf, @NonNull EventLoopGroup eventLoopGroup, Supplier<ClientCnx> clientCnxSupplier, - Optional<AddressResolver<InetSocketAddress>> addressResolver, + @NonNull Optional<Supplier<AddressResolver<InetSocketAddress>>> addressResolverSupplier, ScheduledExecutorService scheduledExecutorService) throws PulsarClientException { + if (clientCnxSupplier == null) { + clientCnxSupplier = () -> new ClientCnx(instrumentProvider, conf, eventLoopGroup); + } this.eventLoopGroup = eventLoopGroup; this.clientConfig = conf; this.maxConnectionsPerHosts = conf.getConnectionsPerBroker(); @@ -152,8 +154,9 @@ public class ConnectionPool implements AutoCloseable { throw new PulsarClientException(e); } - this.shouldCloseDnsResolver = !addressResolver.isPresent(); - this.addressResolver = addressResolver.orElseGet(() -> createAddressResolver(conf, eventLoopGroup)); + this.shouldCloseDnsResolver = !addressResolverSupplier.isPresent(); + this.addressResolver = + addressResolverSupplier.orElseGet(() -> createAddressResolver(conf, eventLoopGroup)).get(); // Auto release useless connections. see: https://github.com/apache/pulsar/issues/15516. this.connectionMaxIdleSeconds = conf.getConnectionMaxIdleSeconds(); this.autoReleaseIdleConnectionsEnabled = connectionMaxIdleSeconds > 0; @@ -185,26 +188,12 @@ public class ConnectionPool implements AutoCloseable { Attributes.builder().put("pulsar.failure.type", "handshake").build()); } - private static AddressResolver<InetSocketAddress> createAddressResolver(ClientConfigurationData conf, - EventLoopGroup eventLoopGroup) { - DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder() - .traceEnabled(true) - .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)) - .socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup), true); - if (conf.getDnsLookupBindAddress() != null) { - InetSocketAddress addr = new InetSocketAddress(conf.getDnsLookupBindAddress(), - conf.getDnsLookupBindPort()); - dnsNameResolverBuilder.localAddress(addr); - } - List<InetSocketAddress> serverAddresses = conf.getDnsServerAddresses(); - if (serverAddresses != null && !serverAddresses.isEmpty()) { - dnsNameResolverBuilder.nameServerProvider(new SequentialDnsServerAddressStreamProvider(serverAddresses)); + private Supplier<AddressResolver<InetSocketAddress>> createAddressResolver(ClientConfigurationData conf, + EventLoopGroup eventLoopGroup) { + if (dnsResolverGroup == null) { + dnsResolverGroup = new DnsResolverGroupImpl(eventLoopGroup, conf); } - DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); - // use DnsAddressResolverGroup to create the AddressResolver since it contains a solution - // to prevent cache stampede / thundering herds problem when a DNS entry expires while the system - // is under high load - return new DnsAddressResolverGroup(dnsNameResolverBuilder).getResolver(eventLoopGroup.next()); + return () -> dnsResolverGroup.createAddressResolver(eventLoopGroup); } private static final Random random = new Random(); @@ -479,6 +468,9 @@ public class ConnectionPool implements AutoCloseable { if (shouldCloseDnsResolver) { addressResolver.close(); } + if (dnsResolverGroup != null) { + dnsResolverGroup.close(); + } if (asyncReleaseUselessConnectionsTask != null && !asyncReleaseUselessConnectionsTask.isCancelled()) { asyncReleaseUselessConnectionsTask.cancel(false); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java new file mode 100644 index 00000000000..61af7968f81 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolver; +import io.netty.resolver.dns.DnsAddressResolverGroup; +import io.netty.resolver.dns.DnsNameResolverBuilder; +import io.netty.resolver.dns.DnsServerAddressStreamProvider; +import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Optional; +import java.util.function.Predicate; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.util.netty.DnsResolverUtil; +import org.apache.pulsar.common.util.netty.EventLoopUtil; + +/** + * An abstraction to manage a group of Netty {@link AddressResolver} instances. + * Uses {@link io.netty.resolver.dns.DnsAddressResolverGroup} to create the {@link AddressResolver} instance + * since it contains a shared DNS cache and a solution to prevent cache stampede / thundering herds problem + * when a DNS entry expires while the system is under high load. + */ +public class DnsResolverGroupImpl implements AutoCloseable { + private final DnsAddressResolverGroup dnsAddressResolverGroup; + + public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup, ClientConfigurationData conf) { + Optional<InetSocketAddress> bindAddress = Optional.ofNullable(conf.getDnsLookupBindAddress()) + .map(addr -> new InetSocketAddress(addr, conf.getDnsLookupBindPort())); + Optional<DnsServerAddressStreamProvider> dnsServerAddresses = Optional.ofNullable(conf.getDnsServerAddresses()) + .filter(Predicate.not(List::isEmpty)) + .map(SequentialDnsServerAddressStreamProvider::new); + this.dnsAddressResolverGroup = createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses); + } + + public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup, Optional<InetSocketAddress> bindAddress, + Optional<DnsServerAddressStreamProvider> dnsServerAddresses) { + this.dnsAddressResolverGroup = createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses); + } + + private static DnsAddressResolverGroup createAddressResolverGroup(EventLoopGroup eventLoopGroup, + Optional<InetSocketAddress> bindAddress, + Optional<DnsServerAddressStreamProvider> + dnsServerAddresses) { + DnsNameResolverBuilder dnsNameResolverBuilder = createDnsNameResolverBuilder(eventLoopGroup); + bindAddress.ifPresent(dnsNameResolverBuilder::localAddress); + dnsServerAddresses.ifPresent(dnsNameResolverBuilder::nameServerProvider); + + return new DnsAddressResolverGroup(dnsNameResolverBuilder); + } + + private static DnsNameResolverBuilder createDnsNameResolverBuilder(EventLoopGroup eventLoopGroup) { + DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder() + .traceEnabled(true) + .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)) + .socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup), true); + DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); + return dnsNameResolverBuilder; + } + + @Override + public void close() { + this.dnsAddressResolverGroup.close(); + } + + public AddressResolver<InetSocketAddress> createAddressResolver(EventLoopGroup eventLoopGroup) { + return dnsAddressResolverGroup.getResolver(eventLoopGroup.next()); + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 01b27f491e1..d38d35926a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -24,6 +24,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolver; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import java.io.IOException; @@ -129,6 +130,8 @@ public class PulsarClientImpl implements PulsarClient { private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; private final boolean createdCnxPool; + private final DnsResolverGroupImpl dnsResolverGroupLocalInstance; + private final AddressResolver<InetSocketAddress> addressResolver; public enum State { Open, Closing, Closed @@ -168,22 +171,22 @@ public class PulsarClientImpl implements PulsarClient { private TransactionCoordinatorClientImpl tcClient; public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { - this(conf, null, null, null, null, null, null, null); + this(conf, null, null, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, null, null, null, null, null, null); + this(conf, eventLoopGroup, null, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null, null, null, null, null); + this(conf, eventLoopGroup, cnxPool, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null); + this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, @@ -192,7 +195,7 @@ public class PulsarClientImpl implements PulsarClient { ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException { this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider, - scheduledExecutorProvider, null); + scheduledExecutorProvider, null, null); } @Builder(builderClassName = "PulsarClientImplBuilder") @@ -200,7 +203,8 @@ public class PulsarClientImpl implements PulsarClient { Timer timer, ExecutorProvider externalExecutorProvider, ExecutorProvider internalExecutorProvider, ScheduledExecutorProvider scheduledExecutorProvider, - ExecutorProvider lookupExecutorProvider) throws PulsarClientException { + ExecutorProvider lookupExecutorProvider, + DnsResolverGroupImpl dnsResolverGroup) throws PulsarClientException { EventLoopGroup eventLoopGroupReference = null; ConnectionPool connectionPoolReference = null; @@ -225,10 +229,29 @@ public class PulsarClientImpl implements PulsarClient { conf.getAuthentication().start(); this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider : new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled"); - connectionPoolReference = - connectionPool != null ? connectionPool : - new ConnectionPool(instrumentProvider, conf, this.eventLoopGroup, - (ScheduledExecutorService) this.scheduledExecutorProvider.getExecutor()); + if (connectionPool != null) { + connectionPoolReference = connectionPool; + dnsResolverGroupLocalInstance = null; + addressResolver = null; + } else { + DnsResolverGroupImpl dnsResolverGroupReference; + if (dnsResolverGroup == null) { + dnsResolverGroupReference = + dnsResolverGroupLocalInstance = new DnsResolverGroupImpl(eventLoopGroupReference, conf); + } else { + dnsResolverGroupReference = dnsResolverGroup; + dnsResolverGroupLocalInstance = null; + } + addressResolver = dnsResolverGroupReference.createAddressResolver(eventLoopGroupReference); + connectionPoolReference = ConnectionPool.builder() + .instrumentProvider(instrumentProvider) + .conf(conf) + .eventLoopGroup(eventLoopGroupReference) + .addressResolverSupplier(Optional.of(() -> addressResolver)) + .scheduledExecutorService( + (ScheduledExecutorService) this.scheduledExecutorProvider.getExecutor()) + .build(); + } this.cnxPool = connectionPoolReference; this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); @@ -919,6 +942,14 @@ public class PulsarClientImpl implements PulsarClient { conf.getServiceUrlProvider().close(); } + if (addressResolver != null) { + addressResolver.close(); + } + + if (dnsResolverGroupLocalInstance != null) { + dnsResolverGroupLocalInstance.close(); + } + try { // Shutting down eventLoopGroup separately because in some cases, cnxPool might be using different // eventLoopGroup. diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 372d45ffe60..e479b8ee622 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -404,7 +404,7 @@ public class ProxyConnection extends PulsarHandler { if (this.connectionPool == null) { this.connectionPool = new ConnectionPool(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(), clientCnxSupplier, - Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null); + Optional.of(() -> dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null); } else { LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", remoteAddress, state, maybeAnonymizedClientAuthRole);
