This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 39b186269c6 [Proxy & Client] Configure Netty DNS resolver to match JDK
DNS caching setting, share DNS resolver instance in Proxy (#15219)
39b186269c6 is described below
commit 39b186269c6ccdfa94412a28cc3719a4338368ac
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Apr 20 14:47:25 2022 +0300
[Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching
setting, share DNS resolver instance in Proxy (#15219)
* Align Netty DNS resolver cache settings with Java DNS cache settings
- Netty DNS resolver caches forever by default
- this could cause problems with Kubernetes deployments
* Share Netty DNSNameResolver in proxy
* Remove overriding of ConnectionPool.close since it's not necessary
* Address review comment: remove ProxyConnectionPool
---
.../apache/pulsar/client/impl/ConnectionPool.java | 24 ++++++-
pulsar-common/pom.xml | 5 ++
.../pulsar/common/util/netty/DnsResolverUtil.java | 75 ++++++++++++++++++++++
.../pulsar/proxy/server/ProxyConnection.java | 36 ++++++-----
.../pulsar/proxy/server/ProxyConnectionPool.java | 58 -----------------
.../apache/pulsar/proxy/server/ProxyService.java | 3 +
.../proxy/server/ServiceChannelInitializer.java | 2 +-
7 files changed, 125 insertions(+), 78 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 06b5d24e8b4..edb2a983f25 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -36,6 +36,7 @@ import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,6 +48,7 @@ import
org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,13 +64,20 @@ public class ConnectionPool implements AutoCloseable {
private final boolean isSniProxy;
protected final DnsNameResolver dnsResolver;
+ private final boolean shouldCloseDnsResolver;
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
}
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup,
- Supplier<ClientCnx> clientCnxSupplier) throws
PulsarClientException {
+ Supplier<ClientCnx> clientCnxSupplier) throws
PulsarClientException {
+ this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
+ }
+
+ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup,
+ Supplier<ClientCnx> clientCnxSupplier,
Optional<DnsNameResolver> dnsNameResolver)
+ throws PulsarClientException {
this.eventLoopGroup = eventLoopGroup;
this.clientConfig = conf;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -91,6 +100,12 @@ public class ConnectionPool implements AutoCloseable {
log.error("Failed to create channel initializer");
throw new PulsarClientException(e);
}
+
+ this.shouldCloseDnsResolver = !dnsNameResolver.isPresent();
+ this.dnsResolver = dnsNameResolver.orElseGet(() ->
createDnsNameResolver(conf, eventLoopGroup));
+ }
+
+ private static DnsNameResolver
createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) {
DnsNameResolverBuilder dnsNameResolverBuilder = new
DnsNameResolverBuilder(eventLoopGroup.next())
.traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
if (conf.getDnsLookupBindAddress() != null) {
@@ -98,7 +113,8 @@ public class ConnectionPool implements AutoCloseable {
conf.getDnsLookupBindPort());
dnsNameResolverBuilder.localAddress(addr);
}
- this.dnsResolver = dnsNameResolverBuilder.build();
+ DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+ return dnsNameResolverBuilder.build();
}
private static final Random random = new Random();
@@ -320,7 +336,9 @@ public class ConnectionPool implements AutoCloseable {
@Override
public void close() throws Exception {
closeAllConnections();
- dnsResolver.close();
+ if (shouldCloseDnsResolver) {
+ dnsResolver.close();
+ }
}
private void cleanupConnection(InetSocketAddress address, int
connectionKey,
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 1b3f798de2c..e892b53962a 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -72,6 +72,11 @@
<artifactId>netty-handler</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-resolver-dns</artifactId>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
new file mode 100644
index 00000000000..8b06dbf36ec
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DnsResolverUtil {
+ private static final int MIN_TTL = 0;
+ private static final int TTL;
+ private static final int NEGATIVE_TTL;
+
+ // default TTL value when JDK setting is "forever" (-1)
+ private static final int DEFAULT_TTL = 60;
+
+ // default negative TTL value when JDK setting is "forever" (-1)
+ private static final int DEFAULT_NEGATIVE_TTL = 10;
+
+ static {
+ int ttl = DEFAULT_TTL;
+ int negativeTtl = DEFAULT_NEGATIVE_TTL;
+ try {
+ // use reflection to call sun.net.InetAddressCachePolicy's get and
getNegative methods for getting
+ // effective JDK settings for DNS caching
+ Class<?> inetAddressCachePolicyClass =
Class.forName("sun.net.InetAddressCachePolicy");
+ Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
+ ttl = (Integer) getTTLMethod.invoke(null);
+ Method getNegativeTTLMethod =
inetAddressCachePolicyClass.getMethod("getNegative");
+ negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
+ } catch (NoSuchMethodException | ClassNotFoundException |
InvocationTargetException
+ | IllegalAccessException e) {
+ log.warn("Cannot get DNS TTL settings from
sun.net.InetAddressCachePolicy class", e);
+ }
+ TTL = useDefaultTTLWhenSetToForever(ttl, DEFAULT_TTL);
+ NEGATIVE_TTL = useDefaultTTLWhenSetToForever(negativeTtl,
DEFAULT_NEGATIVE_TTL);
+ }
+
+ private static int useDefaultTTLWhenSetToForever(int ttl, int defaultTtl) {
+ return ttl < 0 ? defaultTtl : ttl;
+ }
+
+ private DnsResolverUtil() {
+ // utility class with static methods, prevent instantiation
+ }
+
+ /**
+ * Configure Netty's {@link DnsNameResolverBuilder}'s ttl and negativeTtl
to match the JDK's DNS caching settings.
+ * If the JDK setting for TTL is forever (-1), the TTL will be set to 60
seconds.
+ *
+ * @param dnsNameResolverBuilder The Netty {@link DnsNameResolverBuilder}
instance to apply the settings
+ */
+ public static void applyJdkDnsCacheSettings(DnsNameResolverBuilder
dnsNameResolverBuilder) {
+ dnsNameResolverBuilder.ttl(MIN_TTL, TTL);
+ dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL);
+ }
+}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 58203eee51c..99f8f04ea84 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -25,9 +25,11 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslHandler;
+import io.netty.resolver.dns.DnsNameResolver;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -71,6 +73,7 @@ public class ProxyConnection extends PulsarHandler {
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0,
Long.MAX_VALUE / 2));
private final ProxyService service;
+ private final DnsNameResolver dnsNameResolver;
AuthenticationDataSource authenticationData;
private State state;
private final Supplier<SslHandler> sslHandlerSupplier;
@@ -119,9 +122,11 @@ public class ProxyConnection extends PulsarHandler {
return connectionPool;
}
- public ProxyConnection(ProxyService proxyService, Supplier<SslHandler>
sslHandlerSupplier) {
+ public ProxyConnection(ProxyService proxyService, Supplier<SslHandler>
sslHandlerSupplier,
+ DnsNameResolver dnsNameResolver) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
+ this.dnsNameResolver = dnsNameResolver;
this.state = State.Init;
this.sslHandlerSupplier = sslHandlerSupplier;
this.brokerProxyValidator = service.getBrokerProxyValidator();
@@ -229,27 +234,26 @@ public class ProxyConnection extends PulsarHandler {
}
private synchronized void completeConnect(AuthData clientData) throws
PulsarClientException {
+ Supplier<ClientCnx> clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
if
(service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
- if (this.connectionPool == null) {
- this.connectionPool = new ProxyConnectionPool(clientConf,
service.getWorkerGroup(),
- () -> new ProxyClientCnx(clientConf,
service.getWorkerGroup(), clientAuthRole, clientAuthData,
- clientAuthMethod, protocolVersionToAdvertise));
- } else {
- LOG.error("BUG! Connection Pool has already been created for
proxy connection to {} state {} role {}",
- remoteAddress, state, clientAuthRole);
- }
+ clientCnxSupplier =
+ () -> new ProxyClientCnx(clientConf,
service.getWorkerGroup(), clientAuthRole, clientAuthData,
+ clientAuthMethod, protocolVersionToAdvertise);
} else {
- if (this.connectionPool == null) {
- this.connectionPool = new ProxyConnectionPool(clientConf,
service.getWorkerGroup(),
- () -> new ClientCnx(clientConf,
service.getWorkerGroup(), protocolVersionToAdvertise));
- } else {
- LOG.error("BUG! Connection Pool has already been created for
proxy connection to {} state {}",
- remoteAddress, state);
- }
+ clientCnxSupplier =
+ () -> new ClientCnx(clientConf, service.getWorkerGroup(),
protocolVersionToAdvertise);
+ }
+
+ if (this.connectionPool == null) {
+ this.connectionPool = new ConnectionPool(clientConf,
service.getWorkerGroup(),
+ clientCnxSupplier, Optional.of(dnsNameResolver));
+ } else {
+ LOG.error("BUG! Connection Pool has already been created for proxy
connection to {} state {} role {}",
+ remoteAddress, state, clientAuthRole);
}
LOG.info("[{}] complete connection, init proxy handler. authenticated
with {} role {}, hasProxyToBrokerUrl: {}",
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
deleted file mode 100644
index 4dcb09570c6..00000000000
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.proxy.server;
-
-import io.netty.channel.EventLoopGroup;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ProxyConnectionPool extends ConnectionPool {
- public ProxyConnectionPool(ClientConfigurationData clientConfig,
EventLoopGroup eventLoopGroup,
- Supplier<ClientCnx> clientCnxSupplier) throws
PulsarClientException {
- super(clientConfig, eventLoopGroup, clientCnxSupplier);
- }
-
- @Override
- public void close() throws IOException {
- log.info("Closing ProxyConnectionPool.");
- pool.forEach((address, clientCnxPool) -> {
- if (clientCnxPool != null) {
- clientCnxPool.forEach((identifier, clientCnx) -> {
- if (clientCnx != null && clientCnx.isDone()) {
- try {
- clientCnx.get().close();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Unable to close get client connection
future.", e);
- }
- }
- });
- }
- });
- dnsResolver.close();
- }
-
- private static final Logger log =
LoggerFactory.getLogger(ProxyConnectionPool.class);
-}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 6a830657423..10b99aeaff1 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -163,6 +164,8 @@ public class ProxyService implements Closeable {
DnsNameResolverBuilder dnsNameResolverBuilder = new
DnsNameResolverBuilder(workerGroup.next())
.channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+ DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+
dnsNameResolver = dnsNameResolverBuilder.build();
brokerProxyValidator = new
BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index e5f15b66d31..1a588b481fc 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -173,7 +173,7 @@ public class ServiceChannelInitializer extends
ChannelInitializer<SocketChannel>
}
ch.pipeline().addLast("handler",
- new ProxyConnection(proxyService, sslHandlerSupplier));
+ new ProxyConnection(proxyService, sslHandlerSupplier,
proxyService.getDnsNameResolver()));
}
}