This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2c4266efc4fd1076950acb56ca99c7ad24188fac Author: Nicolò Boschi <[email protected]> AuthorDate: Thu Feb 23 10:12:10 2023 +0100 [fix][client] Broker address resolution wrong if connect through a multi-dns names proxy (#19597) (cherry picked from commit e2863391e7f6f9b6c5060f0f78378493f8df37f3) (cherry picked from commit 14b070ba0753c12708dcd6e1ad79736280efc587) --- .../pulsar/client/impl/ConnectionPoolTest.java | 90 +++++++++++++++++++++- .../apache/pulsar/client/impl/ConnectionPool.java | 24 ++++-- .../client/impl/PulsarChannelInitializer.java | 8 +- 3 files changed, 107 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java index 30583bb64cd..f726a59366e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java @@ -20,11 +20,17 @@ package org.apache.pulsar.client.impl; import com.google.common.collect.Lists; import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AbstractAddressResolver; import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import java.util.stream.IntStream; +import io.netty.util.concurrent.Promise; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -68,7 +74,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { List<InetSocketAddress> result = Lists.newArrayList(); result.add(new InetSocketAddress("127.0.0.1", brokerPort)); Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name", - brokerPort))) + brokerPort))) .thenReturn(CompletableFuture.completedFuture(result)); client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create(); @@ -109,7 +115,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); InetSocketAddress brokerAddress = - InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); + InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); IntStream.range(1, 5).forEach(i -> { pool.getConnection(brokerAddress).thenAccept(cnx -> { Assert.assertTrue(cnx.channel().isActive()); @@ -121,6 +127,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { pool.closeAllConnections(); pool.close(); + eventLoop.shutdownGracefully(); } @Test @@ -131,7 +138,7 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); InetSocketAddress brokerAddress = - InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); + InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); IntStream.range(1, 10).forEach(i -> { pool.getConnection(brokerAddress).thenAccept(cnx -> { Assert.assertTrue(cnx.channel().isActive()); @@ -143,5 +150,82 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest { pool.closeAllConnections(); pool.close(); + eventLoop.shutdownGracefully(); + } + + + @Test + public void testSetProxyToTargetBrokerAddress() throws Exception { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setConnectionsPerBroker(5); + + + EventLoopGroup eventLoop = + EventLoopUtil.newEventLoopGroup(8, false, + new DefaultThreadFactory("test")); + + final AbstractAddressResolver resolver = new AbstractAddressResolver(eventLoop.next()) { + @Override + protected boolean doIsResolved(SocketAddress socketAddress) { + return !((InetSocketAddress) socketAddress).isUnresolved(); + } + + @Override + protected void doResolve(SocketAddress socketAddress, Promise promise) throws Exception { + promise.setFailure(new IllegalStateException()); + throw new IllegalStateException(); + } + + @Override + protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws Exception { + final InetSocketAddress socketAddress1 = (InetSocketAddress) socketAddress; + final boolean isProxy = socketAddress1.getHostName().equals("proxy"); + final boolean isBroker = socketAddress1.getHostName().equals("broker"); + if (!isProxy && !isBroker) { + promise.setFailure(new IllegalStateException()); + throw new IllegalStateException(); + } + List<InetSocketAddress> result = new ArrayList<>(); + if (isProxy) { + result.add(new InetSocketAddress("localhost", brokerPort)); + result.add(InetSocketAddress.createUnresolved("proxy", brokerPort)); + } else { + result.add(new InetSocketAddress("127.0.0.1", brokerPort)); + result.add(InetSocketAddress.createUnresolved("broker", brokerPort)); + } + promise.setSuccess(result); + } + }; + + ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop, + (Supplier<ClientCnx>) () -> new ClientCnx(conf, eventLoop), Optional.of(resolver)); + + + ClientCnx cnx = pool.getConnection( + InetSocketAddress.createUnresolved("proxy", 9999), + InetSocketAddress.createUnresolved("proxy", 9999)).get(); + Assert.assertEquals(cnx.remoteHostName, "proxy"); + Assert.assertNull(cnx.proxyToTargetBrokerAddress); + cnx.close(); + + cnx = pool.getConnection( + InetSocketAddress.createUnresolved("broker", 9999), + InetSocketAddress.createUnresolved("proxy", 9999)).get(); + Assert.assertEquals(cnx.remoteHostName, "proxy"); + Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999"); + cnx.close(); + + + cnx = pool.getConnection( + InetSocketAddress.createUnresolved("broker", 9999), + InetSocketAddress.createUnresolved("broker", 9999)).get(); + Assert.assertEquals(cnx.remoteHostName, "broker"); + Assert.assertNull(cnx.proxyToTargetBrokerAddress); + cnx.close(); + + + pool.closeAllConnections(); + pool.close(); + eventLoop.shutdownGracefully(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 96da8bc79d9..5fc9daecd90 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -243,8 +243,12 @@ public class ConnectionPool implements AutoCloseable { resolvedAddress = resolveName(unresolvedPhysicalAddress); } return resolvedAddress.thenCompose( - inetAddresses -> connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(), - isSniProxy ? unresolvedPhysicalAddress : null)); + inetAddresses -> connectToResolvedAddresses( + logicalAddress, + unresolvedPhysicalAddress, + inetAddresses.iterator(), + isSniProxy ? unresolvedPhysicalAddress : null) + ); } catch (URISyntaxException e) { log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e); return FutureUtil @@ -257,17 +261,19 @@ public class ConnectionPool implements AutoCloseable { * address is working. */ private CompletableFuture<Channel> connectToResolvedAddresses(InetSocketAddress logicalAddress, + InetSocketAddress unresolvedPhysicalAddress, Iterator<InetSocketAddress> resolvedPhysicalAddress, InetSocketAddress sniHost) { CompletableFuture<Channel> future = new CompletableFuture<>(); // Successfully connected to server - connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), sniHost) + connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), unresolvedPhysicalAddress, sniHost) .thenAccept(future::complete) .exceptionally(exception -> { if (resolvedPhysicalAddress.hasNext()) { // Try next IP address - connectToResolvedAddresses(logicalAddress, resolvedPhysicalAddress, sniHost) + connectToResolvedAddresses(logicalAddress, unresolvedPhysicalAddress, + resolvedPhysicalAddress, sniHost) .thenAccept(future::complete) .exceptionally(ex -> { // This is already unwinding the recursive call @@ -300,20 +306,24 @@ public class ConnectionPool implements AutoCloseable { * Attempt to establish a TCP connection to an already resolved single IP address. */ private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress, - InetSocketAddress physicalAddress, InetSocketAddress sniHost) { + InetSocketAddress physicalAddress, + InetSocketAddress unresolvedPhysicalAddress, + InetSocketAddress sniHost) { if (clientConfig.isUseTls()) { return toCompletableFuture(bootstrap.register()) .thenCompose(channel -> channelInitializerHandler .initTls(channel, sniHost != null ? sniHost : physicalAddress)) .thenCompose(channelInitializerHandler::initSocks5IfConfig) .thenCompose(ch -> - channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress)) + channelInitializerHandler.initializeClientCnx(ch, logicalAddress, + unresolvedPhysicalAddress)) .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); } else { return toCompletableFuture(bootstrap.register()) .thenCompose(channelInitializerHandler::initSocks5IfConfig) .thenCompose(ch -> - channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress)) + channelInitializerHandler.initializeClientCnx(ch, logicalAddress, + unresolvedPhysicalAddress)) .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index 742d2dd7a73..178cf97ef98 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -206,7 +206,7 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> CompletableFuture<Channel> initializeClientCnx(Channel ch, InetSocketAddress logicalAddress, - InetSocketAddress resolvedPhysicalAddress) { + InetSocketAddress unresolvedPhysicalAddress) { return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> { final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler"); @@ -214,15 +214,13 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> throw new IllegalStateException("Missing ClientCnx. This should not happen."); } - // Need to do our own equality because the physical address is resolved already - if (!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString()) - && logicalAddress.getPort() == resolvedPhysicalAddress.getPort())) { + if (!logicalAddress.equals(unresolvedPhysicalAddress)) { // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that // it can be specified when sending the CommandConnect. cnx.setTargetBroker(logicalAddress); } - cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString()); + cnx.setRemoteHostName(unresolvedPhysicalAddress.getHostString()); return ch; }));
