This is an automated email from the ASF dual-hosted git repository. mmarshall pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6f6130ed5af88d0a3144e4d8238d6986b0793520 Author: Michael Marshall <[email protected]> AuthorDate: Mon Jan 30 03:29:52 2023 -0600 [fix][client] Set fields earlier for correct ClientCnx initialization (#19327) (cherry picked from commit 3d8b52a9531185f3b273bc10dda07243abb30862) --- .../apache/pulsar/client/impl/ConnectionPool.java | 44 +++++++++++----------- .../client/impl/PulsarChannelInitializer.java | 25 ++++++++++++ 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 5089de06993..96da8bc79d9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -182,7 +182,7 @@ public class ConnectionPool implements AutoCloseable { final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>(); // Trigger async connect to broker - createConnection(physicalAddress).thenAccept(channel -> { + createConnection(logicalAddress, physicalAddress).thenAccept(channel -> { log.info("[{}] Connected to server", channel); channel.closeFuture().addListener(v -> { @@ -204,16 +204,6 @@ public class ConnectionPool implements AutoCloseable { return; } - if (!logicalAddress.equals(physicalAddress)) { - // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that - // it can be specified when sending the CommandConnect. - // That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after - // this method. - cnx.setTargetBroker(logicalAddress); - } - - cnx.setRemoteHostName(physicalAddress.getHostString()); - cnx.connectionFuture().thenRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] Connection handshake completed", cnx.channel()); @@ -241,7 +231,8 @@ public class ConnectionPool implements AutoCloseable { /** * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server. */ - private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) { + private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAddress, + InetSocketAddress unresolvedPhysicalAddress) { CompletableFuture<List<InetSocketAddress>> resolvedAddress; try { if (isSniProxy) { @@ -249,11 +240,11 @@ public class ConnectionPool implements AutoCloseable { resolvedAddress = resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), proxyURI.getPort())); } else { - resolvedAddress = resolveName(unresolvedAddress); + resolvedAddress = resolveName(unresolvedPhysicalAddress); } return resolvedAddress.thenCompose( - inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), - isSniProxy ? unresolvedAddress : null)); + inetAddresses -> connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(), + isSniProxy ? unresolvedPhysicalAddress : null)); } catch (URISyntaxException e) { log.error("Invalid Proxy url {}", clientConfig.getProxyServiceUrl(), e); return FutureUtil @@ -265,17 +256,19 @@ public class ConnectionPool implements AutoCloseable { * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no * address is working. */ - private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses, + private CompletableFuture<Channel> connectToResolvedAddresses(InetSocketAddress logicalAddress, + Iterator<InetSocketAddress> resolvedPhysicalAddress, InetSocketAddress sniHost) { CompletableFuture<Channel> future = new CompletableFuture<>(); // Successfully connected to server - connectToAddress(unresolvedAddresses.next(), sniHost) + connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), sniHost) .thenAccept(future::complete) .exceptionally(exception -> { - if (unresolvedAddresses.hasNext()) { + if (resolvedPhysicalAddress.hasNext()) { // Try next IP address - connectToResolvedAddresses(unresolvedAddresses, sniHost).thenAccept(future::complete) + connectToResolvedAddresses(logicalAddress, resolvedPhysicalAddress, sniHost) + .thenAccept(future::complete) .exceptionally(ex -> { // This is already unwinding the recursive call future.completeExceptionally(ex); @@ -306,17 +299,22 @@ public class ConnectionPool implements AutoCloseable { /** * Attempt to establish a TCP connection to an already resolved single IP address. */ - private CompletableFuture<Channel> connectToAddress(InetSocketAddress remoteAddress, InetSocketAddress sniHost) { + private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress, + InetSocketAddress physicalAddress, InetSocketAddress sniHost) { if (clientConfig.isUseTls()) { return toCompletableFuture(bootstrap.register()) .thenCompose(channel -> channelInitializerHandler - .initTls(channel, sniHost != null ? sniHost : remoteAddress)) + .initTls(channel, sniHost != null ? sniHost : physicalAddress)) .thenCompose(channelInitializerHandler::initSocks5IfConfig) - .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress))); + .thenCompose(ch -> + channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress)) + .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); } else { return toCompletableFuture(bootstrap.register()) .thenCompose(channelInitializerHandler::initSocks5IfConfig) - .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress))); + .thenCompose(ch -> + channelInitializerHandler.initializeClientCnx(ch, logicalAddress, physicalAddress)) + .thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress))); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index bac1cd9ba41..742d2dd7a73 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -42,6 +42,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder; +import org.apache.pulsar.common.util.netty.NettyFutureUtil; @Slf4j public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> { @@ -202,5 +203,29 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> return initSocks5Future; } + + CompletableFuture<Channel> initializeClientCnx(Channel ch, + InetSocketAddress logicalAddress, + InetSocketAddress resolvedPhysicalAddress) { + return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() -> { + final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler"); + + if (cnx == null) { + throw new IllegalStateException("Missing ClientCnx. This should not happen."); + } + + // Need to do our own equality because the physical address is resolved already + if (!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString()) + && logicalAddress.getPort() == resolvedPhysicalAddress.getPort())) { + // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that + // it can be specified when sending the CommandConnect. + cnx.setTargetBroker(logicalAddress); + } + + cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString()); + + return ch; + })); + } }
