This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2c4266efc4fd1076950acb56ca99c7ad24188fac
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Feb 23 10:12:10 2023 +0100

    [fix][client] Broker address resolution wrong if connect through a 
multi-dns names proxy (#19597)
    
    (cherry picked from commit e2863391e7f6f9b6c5060f0f78378493f8df37f3)
    (cherry picked from commit 14b070ba0753c12708dcd6e1ad79736280efc587)
---
 .../pulsar/client/impl/ConnectionPoolTest.java     | 90 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConnectionPool.java  | 24 ++++--
 .../client/impl/PulsarChannelInitializer.java      |  8 +-
 3 files changed, 107 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 30583bb64cd..f726a59366e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -20,11 +20,17 @@ package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Lists;
 import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.AbstractAddressResolver;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
+import io.netty.util.concurrent.Promise;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -68,7 +74,7 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
         List<InetSocketAddress> result = Lists.newArrayList();
         result.add(new InetSocketAddress("127.0.0.1", brokerPort));
         
Mockito.when(pool.resolveName(InetSocketAddress.createUnresolved("non-existing-dns-name",
-                brokerPort)))
+                        brokerPort)))
                 .thenReturn(CompletableFuture.completedFuture(result));
 
         
client.newProducer().topic("persistent://sample/standalone/ns/my-topic").create();
@@ -109,7 +115,7 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
         ConnectionPool pool = 
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
 
         InetSocketAddress brokerAddress =
-            InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
+                InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
         IntStream.range(1, 5).forEach(i -> {
             pool.getConnection(brokerAddress).thenAccept(cnx -> {
                 Assert.assertTrue(cnx.channel().isActive());
@@ -121,6 +127,7 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
 
         pool.closeAllConnections();
         pool.close();
+        eventLoop.shutdownGracefully();
     }
 
     @Test
@@ -131,7 +138,7 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
         ConnectionPool pool = 
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
 
         InetSocketAddress brokerAddress =
-            InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
+                InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
         IntStream.range(1, 10).forEach(i -> {
             pool.getConnection(brokerAddress).thenAccept(cnx -> {
                 Assert.assertTrue(cnx.channel().isActive());
@@ -143,5 +150,82 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
 
         pool.closeAllConnections();
         pool.close();
+        eventLoop.shutdownGracefully();
+    }
+
+
+    @Test
+    public void testSetProxyToTargetBrokerAddress() throws Exception {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setConnectionsPerBroker(5);
+
+
+        EventLoopGroup eventLoop =
+                EventLoopUtil.newEventLoopGroup(8, false,
+                        new DefaultThreadFactory("test"));
+
+        final AbstractAddressResolver resolver = new 
AbstractAddressResolver(eventLoop.next()) {
+            @Override
+            protected boolean doIsResolved(SocketAddress socketAddress) {
+                return !((InetSocketAddress) socketAddress).isUnresolved();
+            }
+
+            @Override
+            protected void doResolve(SocketAddress socketAddress, Promise 
promise) throws Exception {
+                promise.setFailure(new IllegalStateException());
+                throw new IllegalStateException();
+            }
+
+            @Override
+            protected void doResolveAll(SocketAddress socketAddress, Promise 
promise) throws Exception {
+                final InetSocketAddress socketAddress1 = (InetSocketAddress) 
socketAddress;
+                final boolean isProxy = 
socketAddress1.getHostName().equals("proxy");
+                final boolean isBroker = 
socketAddress1.getHostName().equals("broker");
+                if (!isProxy && !isBroker) {
+                    promise.setFailure(new IllegalStateException());
+                    throw new IllegalStateException();
+                }
+                List<InetSocketAddress> result = new ArrayList<>();
+                if (isProxy) {
+                    result.add(new InetSocketAddress("localhost", brokerPort));
+                    result.add(InetSocketAddress.createUnresolved("proxy", 
brokerPort));
+                } else {
+                    result.add(new InetSocketAddress("127.0.0.1", brokerPort));
+                    result.add(InetSocketAddress.createUnresolved("broker", 
brokerPort));
+                }
+                promise.setSuccess(result);
+            }
+        };
+
+        ConnectionPool pool = 
spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop,
+                (Supplier<ClientCnx>) () -> new ClientCnx(conf, eventLoop), 
Optional.of(resolver));
+
+
+        ClientCnx cnx = pool.getConnection(
+                InetSocketAddress.createUnresolved("proxy", 9999),
+                InetSocketAddress.createUnresolved("proxy", 9999)).get();
+        Assert.assertEquals(cnx.remoteHostName, "proxy");
+        Assert.assertNull(cnx.proxyToTargetBrokerAddress);
+        cnx.close();
+
+        cnx = pool.getConnection(
+                InetSocketAddress.createUnresolved("broker", 9999),
+                InetSocketAddress.createUnresolved("proxy", 9999)).get();
+        Assert.assertEquals(cnx.remoteHostName, "proxy");
+        Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999");
+        cnx.close();
+
+
+        cnx = pool.getConnection(
+                InetSocketAddress.createUnresolved("broker", 9999),
+                InetSocketAddress.createUnresolved("broker", 9999)).get();
+        Assert.assertEquals(cnx.remoteHostName, "broker");
+        Assert.assertNull(cnx.proxyToTargetBrokerAddress);
+        cnx.close();
+
+
+        pool.closeAllConnections();
+        pool.close();
+        eventLoop.shutdownGracefully();
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 96da8bc79d9..5fc9daecd90 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -243,8 +243,12 @@ public class ConnectionPool implements AutoCloseable {
                 resolvedAddress = resolveName(unresolvedPhysicalAddress);
             }
             return resolvedAddress.thenCompose(
-                    inetAddresses -> 
connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(),
-                            isSniProxy ? unresolvedPhysicalAddress : null));
+                    inetAddresses -> connectToResolvedAddresses(
+                            logicalAddress,
+                            unresolvedPhysicalAddress,
+                            inetAddresses.iterator(),
+                            isSniProxy ? unresolvedPhysicalAddress : null)
+            );
         } catch (URISyntaxException e) {
             log.error("Invalid Proxy url {}", 
clientConfig.getProxyServiceUrl(), e);
             return FutureUtil
@@ -257,17 +261,19 @@ public class ConnectionPool implements AutoCloseable {
      * address is working.
      */
     private CompletableFuture<Channel> 
connectToResolvedAddresses(InetSocketAddress logicalAddress,
+                                                                  
InetSocketAddress unresolvedPhysicalAddress,
                                                                   
Iterator<InetSocketAddress> resolvedPhysicalAddress,
                                                                   
InetSocketAddress sniHost) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
         // Successfully connected to server
-        connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), 
sniHost)
+        connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), 
unresolvedPhysicalAddress, sniHost)
                 .thenAccept(future::complete)
                 .exceptionally(exception -> {
                     if (resolvedPhysicalAddress.hasNext()) {
                         // Try next IP address
-                        connectToResolvedAddresses(logicalAddress, 
resolvedPhysicalAddress, sniHost)
+                        connectToResolvedAddresses(logicalAddress, 
unresolvedPhysicalAddress,
+                                resolvedPhysicalAddress, sniHost)
                                 .thenAccept(future::complete)
                                 .exceptionally(ex -> {
                                     // This is already unwinding the recursive 
call
@@ -300,20 +306,24 @@ public class ConnectionPool implements AutoCloseable {
      * Attempt to establish a TCP connection to an already resolved single IP 
address.
      */
     private CompletableFuture<Channel> connectToAddress(InetSocketAddress 
logicalAddress,
-                                                        InetSocketAddress 
physicalAddress, InetSocketAddress sniHost) {
+                                                        InetSocketAddress 
physicalAddress,
+                                                        InetSocketAddress 
unresolvedPhysicalAddress,
+                                                        InetSocketAddress 
sniHost) {
         if (clientConfig.isUseTls()) {
             return toCompletableFuture(bootstrap.register())
                     .thenCompose(channel -> channelInitializerHandler
                             .initTls(channel, sniHost != null ? sniHost : 
physicalAddress))
                     .thenCompose(channelInitializerHandler::initSocks5IfConfig)
                     .thenCompose(ch ->
-                            channelInitializerHandler.initializeClientCnx(ch, 
logicalAddress, physicalAddress))
+                            channelInitializerHandler.initializeClientCnx(ch, 
logicalAddress,
+                                    unresolvedPhysicalAddress))
                     .thenCompose(channel -> 
toCompletableFuture(channel.connect(physicalAddress)));
         } else {
             return toCompletableFuture(bootstrap.register())
                     .thenCompose(channelInitializerHandler::initSocks5IfConfig)
                     .thenCompose(ch ->
-                            channelInitializerHandler.initializeClientCnx(ch, 
logicalAddress, physicalAddress))
+                            channelInitializerHandler.initializeClientCnx(ch, 
logicalAddress,
+                                    unresolvedPhysicalAddress))
                     .thenCompose(channel -> 
toCompletableFuture(channel.connect(physicalAddress)));
         }
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index 742d2dd7a73..178cf97ef98 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -206,7 +206,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
 
     CompletableFuture<Channel> initializeClientCnx(Channel ch,
                                                    InetSocketAddress 
logicalAddress,
-                                                   InetSocketAddress 
resolvedPhysicalAddress) {
+                                                   InetSocketAddress 
unresolvedPhysicalAddress) {
         return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> 
{
             final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");
 
@@ -214,15 +214,13 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
                 throw new IllegalStateException("Missing ClientCnx. This 
should not happen.");
             }
 
-            // Need to do our own equality because the physical address is 
resolved already
-            if 
(!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString())
-                    && logicalAddress.getPort() == 
resolvedPhysicalAddress.getPort())) {
+            if (!logicalAddress.equals(unresolvedPhysicalAddress)) {
                 // We are connecting through a proxy. We need to set the 
target broker in the ClientCnx object so that
                 // it can be specified when sending the CommandConnect.
                 cnx.setTargetBroker(logicalAddress);
             }
 
-            cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());
+            cnx.setRemoteHostName(unresolvedPhysicalAddress.getHostString());
 
             return ch;
         }));

Reply via email to