This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6f6130ed5af88d0a3144e4d8238d6986b0793520
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Jan 30 03:29:52 2023 -0600

    [fix][client] Set fields earlier for correct ClientCnx initialization 
(#19327)
    
    (cherry picked from commit 3d8b52a9531185f3b273bc10dda07243abb30862)
---
 .../apache/pulsar/client/impl/ConnectionPool.java  | 44 +++++++++++-----------
 .../client/impl/PulsarChannelInitializer.java      | 25 ++++++++++++
 2 files changed, 46 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 5089de06993..96da8bc79d9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -182,7 +182,7 @@ public class ConnectionPool implements AutoCloseable {
         final CompletableFuture<ClientCnx> cnxFuture = new 
CompletableFuture<>();
 
         // Trigger async connect to broker
-        createConnection(physicalAddress).thenAccept(channel -> {
+        createConnection(logicalAddress, physicalAddress).thenAccept(channel 
-> {
             log.info("[{}] Connected to server", channel);
 
             channel.closeFuture().addListener(v -> {
@@ -204,16 +204,6 @@ public class ConnectionPool implements AutoCloseable {
                 return;
             }
 
-            if (!logicalAddress.equals(physicalAddress)) {
-                // We are connecting through a proxy. We need to set the 
target broker in the ClientCnx object so that
-                // it can be specified when sending the CommandConnect.
-                // That phase will happen in the ClientCnx.connectionActive() 
which will be invoked immediately after
-                // this method.
-                cnx.setTargetBroker(logicalAddress);
-            }
-
-            cnx.setRemoteHostName(physicalAddress.getHostString());
-
             cnx.connectionFuture().thenRun(() -> {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Connection handshake completed", 
cnx.channel());
@@ -241,7 +231,8 @@ public class ConnectionPool implements AutoCloseable {
     /**
      * Resolve DNS asynchronously and attempt to connect to any IP address 
returned by DNS server.
      */
-    private CompletableFuture<Channel> createConnection(InetSocketAddress 
unresolvedAddress) {
+    private CompletableFuture<Channel> createConnection(InetSocketAddress 
logicalAddress,
+                                                        InetSocketAddress 
unresolvedPhysicalAddress) {
         CompletableFuture<List<InetSocketAddress>> resolvedAddress;
         try {
             if (isSniProxy) {
@@ -249,11 +240,11 @@ public class ConnectionPool implements AutoCloseable {
                 resolvedAddress =
                         
resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), 
proxyURI.getPort()));
             } else {
-                resolvedAddress = resolveName(unresolvedAddress);
+                resolvedAddress = resolveName(unresolvedPhysicalAddress);
             }
             return resolvedAddress.thenCompose(
-                    inetAddresses -> 
connectToResolvedAddresses(inetAddresses.iterator(),
-                            isSniProxy ? unresolvedAddress : null));
+                    inetAddresses -> 
connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(),
+                            isSniProxy ? unresolvedPhysicalAddress : null));
         } catch (URISyntaxException e) {
             log.error("Invalid Proxy url {}", 
clientConfig.getProxyServiceUrl(), e);
             return FutureUtil
@@ -265,17 +256,19 @@ public class ConnectionPool implements AutoCloseable {
      * Try to connect to a sequence of IP addresses until a successful 
connection can be made, or fail if no
      * address is working.
      */
-    private CompletableFuture<Channel> 
connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses,
+    private CompletableFuture<Channel> 
connectToResolvedAddresses(InetSocketAddress logicalAddress,
+                                                                  
Iterator<InetSocketAddress> resolvedPhysicalAddress,
                                                                   
InetSocketAddress sniHost) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
         // Successfully connected to server
-        connectToAddress(unresolvedAddresses.next(), sniHost)
+        connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), 
sniHost)
                 .thenAccept(future::complete)
                 .exceptionally(exception -> {
-                    if (unresolvedAddresses.hasNext()) {
+                    if (resolvedPhysicalAddress.hasNext()) {
                         // Try next IP address
-                        connectToResolvedAddresses(unresolvedAddresses, 
sniHost).thenAccept(future::complete)
+                        connectToResolvedAddresses(logicalAddress, 
resolvedPhysicalAddress, sniHost)
+                                .thenAccept(future::complete)
                                 .exceptionally(ex -> {
                                     // This is already unwinding the recursive 
call
                                     future.completeExceptionally(ex);
@@ -306,17 +299,22 @@ public class ConnectionPool implements AutoCloseable {
     /**
      * Attempt to establish a TCP connection to an already resolved single IP 
address.
      */
-    private CompletableFuture<Channel> connectToAddress(InetSocketAddress 
remoteAddress, InetSocketAddress sniHost) {
+    private CompletableFuture<Channel> connectToAddress(InetSocketAddress 
logicalAddress,
+                                                        InetSocketAddress 
physicalAddress, InetSocketAddress sniHost) {
         if (clientConfig.isUseTls()) {
             return toCompletableFuture(bootstrap.register())
                     .thenCompose(channel -> channelInitializerHandler
-                            .initTls(channel, sniHost != null ? sniHost : 
remoteAddress))
+                            .initTls(channel, sniHost != null ? sniHost : 
physicalAddress))
                     .thenCompose(channelInitializerHandler::initSocks5IfConfig)
-                    .thenCompose(channel -> 
toCompletableFuture(channel.connect(remoteAddress)));
+                    .thenCompose(ch ->
+                            channelInitializerHandler.initializeClientCnx(ch, 
logicalAddress, physicalAddress))
+                    .thenCompose(channel -> 
toCompletableFuture(channel.connect(physicalAddress)));
         } else {
             return toCompletableFuture(bootstrap.register())
                     .thenCompose(channelInitializerHandler::initSocks5IfConfig)
-                    .thenCompose(channel -> 
toCompletableFuture(channel.connect(remoteAddress)));
+                    .thenCompose(ch ->
+                            channelInitializerHandler.initializeClientCnx(ch, 
logicalAddress, physicalAddress))
+                    .thenCompose(channel -> 
toCompletableFuture(channel.connect(physicalAddress)));
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index bac1cd9ba41..742d2dd7a73 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.SecurityUtility;
 import 
org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.netty.NettyFutureUtil;
 
 @Slf4j
 public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel> {
@@ -202,5 +203,29 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
 
         return initSocks5Future;
     }
+
+    CompletableFuture<Channel> initializeClientCnx(Channel ch,
+                                                   InetSocketAddress 
logicalAddress,
+                                                   InetSocketAddress 
resolvedPhysicalAddress) {
+        return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> 
{
+            final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");
+
+            if (cnx == null) {
+                throw new IllegalStateException("Missing ClientCnx. This 
should not happen.");
+            }
+
+            // Need to do our own equality because the physical address is 
resolved already
+            if 
(!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString())
+                    && logicalAddress.getPort() == 
resolvedPhysicalAddress.getPort())) {
+                // We are connecting through a proxy. We need to set the 
target broker in the ClientCnx object so that
+                // it can be specified when sending the CommandConnect.
+                cnx.setTargetBroker(logicalAddress);
+            }
+
+            cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());
+
+            return ch;
+        }));
+    }
 }
 

Reply via email to