This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3673973a89525f89da27ec6f3c875e8658be5ad3 Author: Lari Hotari <[email protected]> AuthorDate: Tue May 3 05:59:19 2022 +0300 [Proxy/Client] Fix DNS server denial-of-service issue when DNS entry expires (#15403) (cherry picked from commit 40d71691dab2a09d3457f8fa638b19ebc2e28dd7) --- .../pulsar/client/impl/ConnectionPoolTest.java | 38 ++++++++-------- .../apache/pulsar/client/impl/ConnectionPool.java | 53 +++++++++++----------- .../pulsar/proxy/server/ProxyConnection.java | 16 +++---- .../apache/pulsar/proxy/server/ProxyService.java | 12 ++--- .../proxy/server/ServiceChannelInitializer.java | 2 +- 5 files changed, 61 insertions(+), 60 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java index 0193f592c75..9fbea7f2914 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java @@ -22,6 +22,10 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor import com.google.common.collect.Lists; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -31,22 +35,18 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.IntStream; - @Test(groups = "broker-impl") public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { String serviceUrl; + int brokerPort; @BeforeClass @Override protected void setup() throws Exception { super.internalSetup(); - serviceUrl = "pulsar://non-existing-dns-name:" + pulsar.getBrokerListenPort().get(); + brokerPort = pulsar.getBrokerListenPort().get(); + serviceUrl = "pulsar://non-existing-dns-name:" + brokerPort; } @AfterClass(alwaysRun = true) @@ -63,9 +63,11 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { conf.setServiceUrl(serviceUrl); PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool); - List<InetAddress> result = Lists.newArrayList(); - result.add(InetAddress.getByName("127.0.0.1")); - Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result)); + List<InetSocketAddress> result = Lists.newArrayList(); + result.add(new InetSocketAddress("127.0.0.1", brokerPort)); + Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name", + brokerPort))) + .thenReturn(CompletableFuture.completedFuture(result)); client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create(); @@ -75,20 +77,20 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { @Test public void testDoubleIpAddress() throws Exception { - String serviceUrl = "pulsar://non-existing-dns-name:" + pulsar.getBrokerListenPort().get(); - ClientConfigurationData conf = new ClientConfigurationData(); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test")); ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); conf.setServiceUrl(serviceUrl); PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool); - List<InetAddress> result = Lists.newArrayList(); + List<InetSocketAddress> result = Lists.newArrayList(); // Add a non existent IP to the response to check that we're trying the 2nd address as well - result.add(InetAddress.getByName("127.0.0.99")); - result.add(InetAddress.getByName("127.0.0.1")); - Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result)); + result.add(new InetSocketAddress("127.0.0.99", brokerPort)); + result.add(new InetSocketAddress("127.0.0.1", brokerPort)); + Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name", + brokerPort))) + .thenReturn(CompletableFuture.completedFuture(result)); // Create producer should succeed by trying the 2nd IP client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create(); @@ -105,7 +107,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); InetSocketAddress brokerAddress = - InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get()); + InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); IntStream.range(1, 5).forEach(i -> { pool.getConnection(brokerAddress).thenAccept(cnx -> { Assert.assertTrue(cnx.channel().isActive()); @@ -127,7 +129,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); InetSocketAddress brokerAddress = - InetSocketAddress.createUnresolved("127.0.0.1", pulsar.getBrokerListenPort().get()); + InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); IntStream.range(1, 10).forEach(i -> { pool.getConnection(brokerAddress).thenAccept(cnx -> { Assert.assertTrue(cnx.channel().isActive()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 7a077b2e45b..e3771e7449e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -26,10 +26,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; -import io.netty.resolver.dns.DnsNameResolver; +import io.netty.resolver.AddressResolver; +import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.Future; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -63,7 +63,7 @@ public class ConnectionPool implements AutoCloseable { private final int maxConnectionsPerHosts; private final boolean isSniProxy; - protected final DnsNameResolver dnsResolver; + protected final AddressResolver<InetSocketAddress> addressResolver; private final boolean shouldCloseDnsResolver; public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { @@ -76,7 +76,8 @@ public class ConnectionPool implements AutoCloseable { } public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, - Supplier<ClientCnx> clientCnxSupplier, Optional<DnsNameResolver> dnsNameResolver) + Supplier<ClientCnx> clientCnxSupplier, + Optional<AddressResolver<InetSocketAddress>> addressResolver) throws PulsarClientException { this.eventLoopGroup = eventLoopGroup; this.clientConfig = conf; @@ -101,15 +102,19 @@ public class ConnectionPool implements AutoCloseable { throw new PulsarClientException(e); } - this.shouldCloseDnsResolver = !dnsNameResolver.isPresent(); - this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup)); + this.shouldCloseDnsResolver = !addressResolver.isPresent(); + this.addressResolver = addressResolver.orElseGet(() -> createAddressResolver(conf, eventLoopGroup)); } - private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { - DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next()) + private static AddressResolver<InetSocketAddress> createAddressResolver(ClientConfigurationData conf, + EventLoopGroup eventLoopGroup) { + DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder() .traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)); DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); - return dnsNameResolverBuilder.build(); + // use DnsAddressResolverGroup to create the AddressResolver since it contains a solution + // to prevent cache stampede / thundering herds problem when a DNS entry expires while the system + // is under high load + return new DnsAddressResolverGroup(dnsNameResolverBuilder).getResolver(eventLoopGroup.next()); } private static final Random random = new Random(); @@ -234,19 +239,17 @@ public class ConnectionPool implements AutoCloseable { * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server. */ private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) { - int port; - CompletableFuture<List<InetAddress>> resolvedAddress; + CompletableFuture<List<InetSocketAddress>> resolvedAddress; try { if (isSniProxy) { URI proxyURI = new URI(clientConfig.getProxyServiceUrl()); - port = proxyURI.getPort(); - resolvedAddress = resolveName(proxyURI.getHost()); + resolvedAddress = + resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), proxyURI.getPort())); } else { - port = unresolvedAddress.getPort(); - resolvedAddress = resolveName(unresolvedAddress.getHostString()); + resolvedAddress = resolveName(unresolvedAddress); } return resolvedAddress.thenCompose( - inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port, + inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), isSniProxy ? unresolvedAddress : null)); } catch (URISyntaxException e) { log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e); @@ -259,18 +262,17 @@ public class ConnectionPool implements AutoCloseable { * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no * address is working. */ - private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, - int port, + private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses, InetSocketAddress sniHost) { CompletableFuture<Channel> future = new CompletableFuture<>(); // Successfully connected to server - connectToAddress(unresolvedAddresses.next(), port, sniHost) + connectToAddress(unresolvedAddresses.next(), sniHost) .thenAccept(future::complete) .exceptionally(exception -> { if (unresolvedAddresses.hasNext()) { // Try next IP address - connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete) + connectToResolvedAddresses(unresolvedAddresses, sniHost).thenAccept(future::complete) .exceptionally(ex -> { // This is already unwinding the recursive call future.completeExceptionally(ex); @@ -286,9 +288,9 @@ public class ConnectionPool implements AutoCloseable { return future; } - CompletableFuture<List<InetAddress>> resolveName(String hostname) { - CompletableFuture<List<InetAddress>> future = new CompletableFuture<>(); - dnsResolver.resolveAll(hostname).addListener((Future<List<InetAddress>> resolveFuture) -> { + CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress unresolvedAddress) { + CompletableFuture<List<InetSocketAddress>> future = new CompletableFuture<>(); + addressResolver.resolveAll(unresolvedAddress).addListener((Future<List<InetSocketAddress>> resolveFuture) -> { if (resolveFuture.isSuccess()) { future.complete(resolveFuture.get()); } else { @@ -301,8 +303,7 @@ public class ConnectionPool implements AutoCloseable { /** * Attempt to establish a TCP connection to an already resolved single IP address. */ - private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) { - InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port); + private CompletableFuture<Channel> connectToAddress(InetSocketAddress remoteAddress, InetSocketAddress sniHost) { if (clientConfig.isUseTls()) { return toCompletableFuture(bootstrap.register()) .thenCompose(channel -> channelInitializerHandler @@ -329,7 +330,7 @@ public class ConnectionPool implements AutoCloseable { public void close() throws Exception { closeAllConnections(); if (shouldCloseDnsResolver) { - dnsResolver.close(); + addressResolver.close(); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 39870f62af8..eeabced97b0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -22,7 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import io.netty.channel.ChannelFutureListener; import io.netty.handler.codec.haproxy.HAProxyMessage; -import io.netty.resolver.dns.DnsNameResolver; +import io.netty.handler.ssl.SslHandler; +import io.netty.resolver.dns.DnsAddressResolverGroup; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; @@ -41,7 +42,6 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; @@ -66,9 +66,6 @@ import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import lombok.Getter; /** @@ -82,7 +79,7 @@ public class ProxyConnection extends PulsarHandler { private final AtomicLong requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); private final ProxyService service; - private final DnsNameResolver dnsNameResolver; + private final DnsAddressResolverGroup dnsAddressResolverGroup; AuthenticationDataSource authenticationData; private State state; private final Supplier<SslHandler> sslHandlerSupplier; @@ -135,10 +132,10 @@ public class ProxyConnection extends PulsarHandler { } public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier, - DnsNameResolver dnsNameResolver) { + DnsAddressResolverGroup dnsAddressResolverGroup) { super(30, TimeUnit.SECONDS); this.service = proxyService; - this.dnsNameResolver = dnsNameResolver; + this.dnsAddressResolverGroup = dnsAddressResolverGroup; this.state = State.Init; this.sslHandlerSupplier = sslHandlerSupplier; this.brokerProxyValidator = service.getBrokerProxyValidator(); @@ -281,7 +278,8 @@ public class ProxyConnection extends PulsarHandler { if (this.connectionPool == null) { this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(), - clientCnxSupplier, Optional.of(dnsNameResolver)); + clientCnxSupplier, + Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next()))); } else { LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", remoteAddress, state, clientAuthRole); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 31c07299bf3..4e65e5b506c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -26,7 +26,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; -import io.netty.resolver.dns.DnsNameResolver; +import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.DefaultThreadFactory; import io.prometheus.client.Counter; @@ -77,7 +77,7 @@ public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; private final Authentication proxyClientAuthentication; @Getter - private final DnsNameResolver dnsNameResolver; + private final DnsAddressResolverGroup dnsAddressResolverGroup; @Getter private final BrokerProxyValidator brokerProxyValidator; private String serviceUrl; @@ -154,13 +154,13 @@ public class ProxyService implements Closeable { false, workersThreadFactory); this.authenticationService = authenticationService; - DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next()) + DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder() .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup)); DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); - dnsNameResolver = dnsNameResolverBuilder.build(); + dnsAddressResolverGroup = new DnsAddressResolverGroup(dnsNameResolverBuilder); - brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(), + brokerProxyValidator = new BrokerProxyValidator(dnsAddressResolverGroup.getResolver(workerGroup.next()), proxyConfig.getBrokerProxyAllowedHostNames(), proxyConfig.getBrokerProxyAllowedIPAddresses(), proxyConfig.getBrokerProxyAllowedTargetPorts()); @@ -252,7 +252,7 @@ public class ProxyService implements Closeable { } public void close() throws IOException { - dnsNameResolver.close(); + dnsAddressResolverGroup.close(); if (discoveryProvider != null) { discoveryProvider.close(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index 4f423c1f5d5..2ce2a93819f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -174,7 +174,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel> } ch.pipeline().addLast("handler", - new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver())); + new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsAddressResolverGroup())); } }
