This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new cf035aef29f [Proxy/Client] Fix DNS server denial-of-service issue when 
DNS entry expires (#15403)
cf035aef29f is described below

commit cf035aef29f5341dbe0062a4252be086b5a9083d
Author: Lari Hotari <[email protected]>
AuthorDate: Tue May 3 10:59:19 2022 +0800

    [Proxy/Client] Fix DNS server denial-of-service issue when DNS entry 
expires (#15403)
    
    (cherry picked from commit 40d71691dab2a09d3457f8fa638b19ebc2e28dd7)
---
 .../pulsar/client/impl/ConnectionPoolTest.java     | 37 ++++++++-------
 .../apache/pulsar/client/impl/ConnectionPool.java  | 53 +++++++++++-----------
 .../pulsar/proxy/server/ProxyConnection.java       | 16 +++----
 .../apache/pulsar/proxy/server/ProxyService.java   | 12 ++---
 .../proxy/server/ServiceChannelInitializer.java    |  2 +-
 5 files changed, 61 insertions(+), 59 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 235bd7167a5..30583bb64cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -21,6 +21,10 @@ package org.apache.pulsar.client.impl;
 import com.google.common.collect.Lists;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -30,11 +34,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.IntStream;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 
@@ -42,12 +41,14 @@ import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
 public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
 
     String serviceUrl;
+    int brokerPort;
 
     @BeforeClass
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
-        serviceUrl = "pulsar://non-existing-dns-name:" + 
pulsar.getBrokerListenPort().get();
+        brokerPort = pulsar.getBrokerListenPort().get();
+        serviceUrl = "pulsar://non-existing-dns-name:" + brokerPort;
     }
 
     @AfterClass(alwaysRun = true)
@@ -64,9 +65,11 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
         conf.setServiceUrl(serviceUrl);
         PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
 
-        List<InetAddress> result = Lists.newArrayList();
-        result.add(InetAddress.getByName("127.0.0.1"));
-        
Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
+        List<InetSocketAddress> result = Lists.newArrayList();
+        result.add(new InetSocketAddress("127.0.0.1", brokerPort));
+        
Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
+                brokerPort)))
+                .thenReturn(CompletableFuture.completedFuture(result));
 
         
client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
 
@@ -76,20 +79,20 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testDoubleIpAddress() throws Exception {
-        String serviceUrl = "pulsar://non-existing-dns-name:" + 
pulsar.getBrokerListenPort().get();
-
         ClientConfigurationData conf = new ClientConfigurationData();
         EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, 
new DefaultThreadFactory("test"));
         ConnectionPool pool = 
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
         conf.setServiceUrl(serviceUrl);
         PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
 
-        List<InetAddress> result = Lists.newArrayList();
+        List<InetSocketAddress> result = Lists.newArrayList();
 
         // Add a non existent IP to the response to check that we're trying 
the 2nd address as well
-        result.add(InetAddress.getByName("127.0.0.99"));
-        result.add(InetAddress.getByName("127.0.0.1"));
-        
Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));
+        result.add(new InetSocketAddress("127.0.0.99", brokerPort));
+        result.add(new InetSocketAddress("127.0.0.1", brokerPort));
+        
Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
+                        brokerPort)))
+                .thenReturn(CompletableFuture.completedFuture(result));
 
         // Create producer should succeed by trying the 2nd IP
         
client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
@@ -106,7 +109,7 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
         ConnectionPool pool = 
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
 
         InetSocketAddress brokerAddress =
-            InetSocketAddress.createUnresolved("127.0.0.1", 
pulsar.getBrokerListenPort().get());
+            InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
         IntStream.range(1, 5).forEach(i -> {
             pool.getConnection(brokerAddress).thenAccept(cnx -> {
                 Assert.assertTrue(cnx.channel().isActive());
@@ -128,7 +131,7 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
         ConnectionPool pool = 
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
 
         InetSocketAddress brokerAddress =
-            InetSocketAddress.createUnresolved("127.0.0.1", 
pulsar.getBrokerListenPort().get());
+            InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
         IntStream.range(1, 10).forEach(i -> {
             pool.getConnection(brokerAddress).thenAccept(cnx -> {
                 Assert.assertTrue(cnx.channel().isActive());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 59aaac0477b..15517e45ba3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -26,10 +26,10 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelException;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
-import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.dns.DnsAddressResolverGroup;
 import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.Future;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -63,7 +63,7 @@ public class ConnectionPool implements AutoCloseable {
     private final int maxConnectionsPerHosts;
     private final boolean isSniProxy;
 
-    protected final DnsNameResolver dnsResolver;
+    protected final AddressResolver<InetSocketAddress> addressResolver;
     private final boolean shouldCloseDnsResolver;
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
@@ -76,7 +76,8 @@ public class ConnectionPool implements AutoCloseable {
     }
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup,
-                             Supplier<ClientCnx> clientCnxSupplier, 
Optional<DnsNameResolver> dnsNameResolver)
+                          Supplier<ClientCnx> clientCnxSupplier,
+                          Optional<AddressResolver<InetSocketAddress>> 
addressResolver)
             throws PulsarClientException {
         this.eventLoopGroup = eventLoopGroup;
         this.clientConfig = conf;
@@ -101,15 +102,19 @@ public class ConnectionPool implements AutoCloseable {
             throw new PulsarClientException(e);
         }
 
-        this.shouldCloseDnsResolver = !dnsNameResolver.isPresent();
-        this.dnsResolver = dnsNameResolver.orElseGet(() -> 
createDnsNameResolver(conf, eventLoopGroup));
+        this.shouldCloseDnsResolver = !addressResolver.isPresent();
+        this.addressResolver = addressResolver.orElseGet(() -> 
createAddressResolver(conf, eventLoopGroup));
     }
 
-    private static DnsNameResolver 
createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) {
-        DnsNameResolverBuilder dnsNameResolverBuilder = new 
DnsNameResolverBuilder(eventLoopGroup.next())
+    private static AddressResolver<InetSocketAddress> 
createAddressResolver(ClientConfigurationData conf,
+                                                                            
EventLoopGroup eventLoopGroup) {
+        DnsNameResolverBuilder dnsNameResolverBuilder = new 
DnsNameResolverBuilder()
                 
.traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
         DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
-        return dnsNameResolverBuilder.build();
+        // use DnsAddressResolverGroup to create the AddressResolver since it 
contains a solution
+        // to prevent cache stampede / thundering herds problem when a DNS 
entry expires while the system
+        // is under high load
+        return new 
DnsAddressResolverGroup(dnsNameResolverBuilder).getResolver(eventLoopGroup.next());
     }
 
     private static final Random random = new Random();
@@ -234,19 +239,17 @@ public class ConnectionPool implements AutoCloseable {
      * Resolve DNS asynchronously and attempt to connect to any IP address 
returned by DNS server.
      */
     private CompletableFuture<Channel> createConnection(InetSocketAddress 
unresolvedAddress) {
-        int port;
-        CompletableFuture<List<InetAddress>> resolvedAddress;
+        CompletableFuture<List<InetSocketAddress>> resolvedAddress;
         try {
             if (isSniProxy) {
                 URI proxyURI = new URI(clientConfig.getProxyServiceUrl());
-                port = proxyURI.getPort();
-                resolvedAddress = resolveName(proxyURI.getHost());
+                resolvedAddress =
+                        
resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), 
proxyURI.getPort()));
             } else {
-                port = unresolvedAddress.getPort();
-                resolvedAddress = 
resolveName(unresolvedAddress.getHostString());
+                resolvedAddress = resolveName(unresolvedAddress);
             }
             return resolvedAddress.thenCompose(
-                    inetAddresses -> 
connectToResolvedAddresses(inetAddresses.iterator(), port,
+                    inetAddresses -> 
connectToResolvedAddresses(inetAddresses.iterator(),
                             isSniProxy ? unresolvedAddress : null));
         } catch (URISyntaxException e) {
             log.error("Invalid Proxy url {}", 
clientConfig.getProxyServiceUrl(), e);
@@ -259,18 +262,17 @@ public class ConnectionPool implements AutoCloseable {
      * Try to connect to a sequence of IP addresses until a successful 
connection can be made, or fail if no
      * address is working.
      */
-    private CompletableFuture<Channel> 
connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
-                                                                  int port,
+    private CompletableFuture<Channel> 
connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses,
                                                                   
InetSocketAddress sniHost) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
         // Successfully connected to server
-        connectToAddress(unresolvedAddresses.next(), port, sniHost)
+        connectToAddress(unresolvedAddresses.next(), sniHost)
                 .thenAccept(future::complete)
                 .exceptionally(exception -> {
                     if (unresolvedAddresses.hasNext()) {
                         // Try next IP address
-                        connectToResolvedAddresses(unresolvedAddresses, port, 
sniHost).thenAccept(future::complete)
+                        connectToResolvedAddresses(unresolvedAddresses, 
sniHost).thenAccept(future::complete)
                                 .exceptionally(ex -> {
                                     // This is already unwinding the recursive 
call
                                     future.completeExceptionally(ex);
@@ -286,9 +288,9 @@ public class ConnectionPool implements AutoCloseable {
         return future;
     }
 
-    CompletableFuture<List<InetAddress>> resolveName(String hostname) {
-        CompletableFuture<List<InetAddress>> future = new 
CompletableFuture<>();
-        
dnsResolver.resolveAll(hostname).addListener((Future<List<InetAddress>> 
resolveFuture) -> {
+    CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress 
unresolvedAddress) {
+        CompletableFuture<List<InetSocketAddress>> future = new 
CompletableFuture<>();
+        
addressResolver.resolveAll(unresolvedAddress).addListener((Future<List<InetSocketAddress>>
 resolveFuture) -> {
             if (resolveFuture.isSuccess()) {
                 future.complete(resolveFuture.get());
             } else {
@@ -301,8 +303,7 @@ public class ConnectionPool implements AutoCloseable {
     /**
      * Attempt to establish a TCP connection to an already resolved single IP 
address.
      */
-    private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, 
int port, InetSocketAddress sniHost) {
-        InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, 
port);
+    private CompletableFuture<Channel> connectToAddress(InetSocketAddress 
remoteAddress, InetSocketAddress sniHost) {
         if (clientConfig.isUseTls()) {
             return toCompletableFuture(bootstrap.register())
                     .thenCompose(channel -> channelInitializerHandler
@@ -332,7 +333,7 @@ public class ConnectionPool implements AutoCloseable {
     public void close() throws Exception {
         closeAllConnections();
         if (shouldCloseDnsResolver) {
-            dnsResolver.close();
+            addressResolver.close();
         }
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 39870f62af8..eeabced97b0 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -22,7 +22,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
-import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.resolver.dns.DnsAddressResolverGroup;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
@@ -41,7 +42,6 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
@@ -66,9 +66,6 @@ import org.slf4j.LoggerFactory;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
 import lombok.Getter;
 
 /**
@@ -82,7 +79,7 @@ public class ProxyConnection extends PulsarHandler {
     private final AtomicLong requestIdGenerator =
             new AtomicLong(ThreadLocalRandom.current().nextLong(0, 
Long.MAX_VALUE / 2));
     private final ProxyService service;
-    private final DnsNameResolver dnsNameResolver;
+    private final DnsAddressResolverGroup dnsAddressResolverGroup;
     AuthenticationDataSource authenticationData;
     private State state;
     private final Supplier<SslHandler> sslHandlerSupplier;
@@ -135,10 +132,10 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> 
sslHandlerSupplier,
-                           DnsNameResolver dnsNameResolver) {
+                           DnsAddressResolverGroup dnsAddressResolverGroup) {
         super(30, TimeUnit.SECONDS);
         this.service = proxyService;
-        this.dnsNameResolver = dnsNameResolver;
+        this.dnsAddressResolverGroup = dnsAddressResolverGroup;
         this.state = State.Init;
         this.sslHandlerSupplier = sslHandlerSupplier;
         this.brokerProxyValidator = service.getBrokerProxyValidator();
@@ -281,7 +278,8 @@ public class ProxyConnection extends PulsarHandler {
 
         if (this.connectionPool == null) {
             this.connectionPool = new ConnectionPool(clientConf, 
service.getWorkerGroup(),
-                    clientCnxSupplier, Optional.of(dnsNameResolver));
+                    clientCnxSupplier,
+                    
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())));
         } else {
             LOG.error("BUG! Connection Pool has already been created for proxy 
connection to {} state {} role {}",
                     remoteAddress, state, clientAuthRole);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 0f4ea152b50..10e122e794d 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -29,7 +29,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsAddressResolverGroup;
 import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Counter;
@@ -75,7 +75,7 @@ public class ProxyService implements Closeable {
     private final ProxyConfiguration proxyConfig;
     private final Authentication proxyClientAuthentication;
     @Getter
-    private final DnsNameResolver dnsNameResolver;
+    private final DnsAddressResolverGroup dnsAddressResolverGroup;
     @Getter
     private final BrokerProxyValidator brokerProxyValidator;
     private String serviceUrl;
@@ -152,13 +152,13 @@ public class ProxyService implements Closeable {
                 false, workersThreadFactory);
         this.authenticationService = authenticationService;
 
-        DnsNameResolverBuilder dnsNameResolverBuilder = new 
DnsNameResolverBuilder(workerGroup.next())
+        DnsNameResolverBuilder dnsNameResolverBuilder = new 
DnsNameResolverBuilder()
                 
.channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
         DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
 
-        dnsNameResolver = dnsNameResolverBuilder.build();
+        dnsAddressResolverGroup = new 
DnsAddressResolverGroup(dnsNameResolverBuilder);
 
-        brokerProxyValidator = new 
BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
+        brokerProxyValidator = new 
BrokerProxyValidator(dnsAddressResolverGroup.getResolver(workerGroup.next()),
                 proxyConfig.getBrokerProxyAllowedHostNames(),
                 proxyConfig.getBrokerProxyAllowedIPAddresses(),
                 proxyConfig.getBrokerProxyAllowedTargetPorts());
@@ -291,7 +291,7 @@ public class ProxyService implements Closeable {
     }
 
     public void close() throws IOException {
-        dnsNameResolver.close();
+        dnsAddressResolverGroup.close();
 
         if (discoveryProvider != null) {
             discoveryProvider.close();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 4f423c1f5d5..2ce2a93819f 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -174,7 +174,7 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
         }
 
         ch.pipeline().addLast("handler",
-                new ProxyConnection(proxyService, sslHandlerSupplier, 
proxyService.getDnsNameResolver()));
+                new ProxyConnection(proxyService, sslHandlerSupplier, 
proxyService.getDnsAddressResolverGroup()));
 
     }
 }

Reply via email to