This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 837cfbd9a7bb9a5c6836ae851d0b8ee43b5a46f4 Author: Lari Hotari <[email protected]> AuthorDate: Wed Apr 20 14:11:43 2022 +0300 [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219) - make Netty DNS resolver settings match the JDK DNS caching settings - with the exception that the max TTL is 60 seconds if DNS max TTL is the default (forever) - reuse the DNS resolver instance on the Proxy (cherry picked from commit f5adc17ed52db2da3c1c58ee4abc9b75b73464ac) --- .../apache/pulsar/client/impl/ConnectionPool.java | 26 ++++++-- pulsar-common/pom.xml | 5 ++ .../pulsar/common/util/netty/DnsResolverUtil.java | 75 ++++++++++++++++++++++ .../pulsar/proxy/server/ProxyConnection.java | 36 ++++++----- .../pulsar/proxy/server/ProxyConnectionPool.java | 60 ----------------- .../apache/pulsar/proxy/server/ProxyService.java | 3 + .../proxy/server/ServiceChannelInitializer.java | 2 +- 7 files changed, 126 insertions(+), 81 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 92b4a3cda70..99492784e5f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -35,6 +35,7 @@ import java.net.URISyntaxException; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -46,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.netty.DnsResolverUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +63,20 @@ public class ConnectionPool implements AutoCloseable { private final boolean isSniProxy; protected final DnsNameResolver dnsResolver; + private final boolean shouldCloseDnsResolver; public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup)); } public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, - Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException { + Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException { + this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty()); + } + + public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, + Supplier<ClientCnx> clientCnxSupplier, Optional<DnsNameResolver> dnsNameResolver) + throws PulsarClientException { this.eventLoopGroup = eventLoopGroup; this.clientConfig = conf; this.maxConnectionsPerHosts = conf.getConnectionsPerBroker(); @@ -91,8 +100,15 @@ public class ConnectionPool implements AutoCloseable { throw new PulsarClientException(e); } - this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true) - .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build(); + this.shouldCloseDnsResolver = !dnsNameResolver.isPresent(); + this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup)); + } + + private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { + DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next()) + .traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)); + DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); + return dnsNameResolverBuilder.build(); } private static final Random random = new Random(); @@ -314,7 +330,9 @@ public class ConnectionPool implements AutoCloseable { @Override public void close() throws Exception { closeAllConnections(); - dnsResolver.close(); + if (shouldCloseDnsResolver) { + dnsResolver.close(); + } } private void cleanupConnection(InetSocketAddress address, int connectionKey, diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index e37022a3c74..0d746cd35e3 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -78,6 +78,11 @@ <artifactId>netty-handler</artifactId> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-resolver-dns</artifactId> + </dependency> + <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport-native-epoll</artifactId> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java new file mode 100644 index 00000000000..8b06dbf36ec --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.netty; + +import io.netty.resolver.dns.DnsNameResolverBuilder; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DnsResolverUtil { + private static final int MIN_TTL = 0; + private static final int TTL; + private static final int NEGATIVE_TTL; + + // default TTL value when JDK setting is "forever" (-1) + private static final int DEFAULT_TTL = 60; + + // default negative TTL value when JDK setting is "forever" (-1) + private static final int DEFAULT_NEGATIVE_TTL = 10; + + static { + int ttl = DEFAULT_TTL; + int negativeTtl = DEFAULT_NEGATIVE_TTL; + try { + // use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting + // effective JDK settings for DNS caching + Class<?> inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy"); + Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get"); + ttl = (Integer) getTTLMethod.invoke(null); + Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative"); + negativeTtl = (Integer) getNegativeTTLMethod.invoke(null); + } catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException + | IllegalAccessException e) { + log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e); + } + TTL = useDefaultTTLWhenSetToForever(ttl, DEFAULT_TTL); + NEGATIVE_TTL = useDefaultTTLWhenSetToForever(negativeTtl, DEFAULT_NEGATIVE_TTL); + } + + private static int useDefaultTTLWhenSetToForever(int ttl, int defaultTtl) { + return ttl < 0 ? defaultTtl : ttl; + } + + private DnsResolverUtil() { + // utility class with static methods, prevent instantiation + } + + /** + * Configure Netty's {@link DnsNameResolverBuilder}'s ttl and negativeTtl to match the JDK's DNS caching settings. + * If the JDK setting for TTL is forever (-1), the TTL will be set to 60 seconds. + * + * @param dnsNameResolverBuilder The Netty {@link DnsNameResolverBuilder} instance to apply the settings + */ + public static void applyJdkDnsCacheSettings(DnsNameResolverBuilder dnsNameResolverBuilder) { + dnsNameResolverBuilder.ttl(MIN_TTL, TTL); + dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL); + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 754e33f96a8..395d16bbbd8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -22,9 +22,11 @@ import static com.google.common.base.Preconditions.checkArgument; import io.netty.channel.ChannelFutureListener; import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.resolver.dns.DnsNameResolver; import java.net.SocketAddress; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -77,6 +79,7 @@ public class ProxyConnection extends PulsarHandler { private final AtomicLong requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); private final ProxyService service; + private final DnsNameResolver dnsNameResolver; private Authentication clientAuthentication; AuthenticationDataSource authenticationData; private State state; @@ -124,9 +127,11 @@ public class ProxyConnection extends PulsarHandler { return connectionPool; } - public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) { + public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier, + DnsNameResolver dnsNameResolver) { super(30, TimeUnit.SECONDS); this.service = proxyService; + this.dnsNameResolver = dnsNameResolver; this.state = State.Init; this.sslHandlerSupplier = sslHandlerSupplier; this.brokerProxyValidator = service.getBrokerProxyValidator(); @@ -234,27 +239,26 @@ public class ProxyConnection extends PulsarHandler { } private synchronized void completeConnect(AuthData clientData) throws PulsarClientException { + Supplier<ClientCnx> clientCnxSupplier; if (service.getConfiguration().isAuthenticationEnabled()) { if (service.getConfiguration().isForwardAuthorizationCredentials()) { this.clientAuthData = clientData; this.clientAuthMethod = authMethod; } - if (this.connectionPool == null) { - this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, - clientAuthMethod, protocolVersionToAdvertise)); - } else { - LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", - remoteAddress, state, clientAuthRole); - } + clientCnxSupplier = + () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, + clientAuthMethod, protocolVersionToAdvertise); } else { - if (this.connectionPool == null) { - this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise)); - } else { - LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}", - remoteAddress, state); - } + clientCnxSupplier = + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise); + } + + if (this.connectionPool == null) { + this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(), + clientCnxSupplier, Optional.of(dnsNameResolver)); + } else { + LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", + remoteAddress, state, clientAuthRole); } LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java deleted file mode 100644 index cd1b31d3434..00000000000 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.proxy.server; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; - -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.ClientCnx; -import org.apache.pulsar.client.impl.ConnectionPool; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.channel.EventLoopGroup; - -public class ProxyConnectionPool extends ConnectionPool { - public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup, - Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException { - super(clientConfig, eventLoopGroup, clientCnxSupplier); - } - - @Override - public void close() throws IOException { - log.info("Closing ProxyConnectionPool."); - pool.forEach((address, clientCnxPool) -> { - if (clientCnxPool != null) { - clientCnxPool.forEach((identifier, clientCnx) -> { - if (clientCnx != null && clientCnx.isDone()) { - try { - clientCnx.get().close(); - } catch (InterruptedException | ExecutionException e) { - log.error("Unable to close get client connection future.", e); - } - } - }); - } - }); - dnsResolver.close(); - } - - private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class); -} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index c9348837dcf..f24caf943b0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -52,6 +52,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.netty.DnsResolverUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; import org.apache.pulsar.proxy.stats.TopicStats; @@ -147,6 +148,8 @@ public class ProxyService implements Closeable { DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next()) .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup)); + DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); + dnsNameResolver = dnsNameResolverBuilder.build(); brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(), diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index a033a87912d..4aae1196fb6 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -159,7 +159,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel> } ch.pipeline().addLast("handler", - new ProxyConnection(proxyService, sslHandlerSupplier)); + new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver())); } }
