This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d8b52a9531 [fix][client] Set fields earlier for correct ClientCnx
initialization (#19327)
3d8b52a9531 is described below
commit 3d8b52a9531185f3b273bc10dda07243abb30862
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Jan 30 03:29:52 2023 -0600
[fix][client] Set fields earlier for correct ClientCnx initialization
(#19327)
---
.../apache/pulsar/client/impl/ConnectionPool.java | 44 +++++++++++-----------
.../client/impl/PulsarChannelInitializer.java | 25 ++++++++++++
2 files changed, 46 insertions(+), 23 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 5e0a592cdc6..2e105b53284 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -244,7 +244,7 @@ public class ConnectionPool implements AutoCloseable {
final CompletableFuture<ClientCnx> cnxFuture = new
CompletableFuture<>();
// Trigger async connect to broker
- createConnection(physicalAddress).thenAccept(channel -> {
+ createConnection(logicalAddress, physicalAddress).thenAccept(channel
-> {
log.info("[{}] Connected to server", channel);
channel.closeFuture().addListener(v -> {
@@ -266,16 +266,6 @@ public class ConnectionPool implements AutoCloseable {
return;
}
- if (!logicalAddress.equals(physicalAddress)) {
- // We are connecting through a proxy. We need to set the
target broker in the ClientCnx object so that
- // it can be specified when sending the CommandConnect.
- // That phase will happen in the ClientCnx.connectionActive()
which will be invoked immediately after
- // this method.
- cnx.setTargetBroker(logicalAddress);
- }
-
- cnx.setRemoteHostName(physicalAddress.getHostString());
-
cnx.connectionFuture().thenRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection handshake completed",
cnx.channel());
@@ -303,7 +293,8 @@ public class ConnectionPool implements AutoCloseable {
/**
* Resolve DNS asynchronously and attempt to connect to any IP address
returned by DNS server.
*/
- private CompletableFuture<Channel> createConnection(InetSocketAddress
unresolvedAddress) {
+ private CompletableFuture<Channel> createConnection(InetSocketAddress
logicalAddress,
+ InetSocketAddress
unresolvedPhysicalAddress) {
CompletableFuture<List<InetSocketAddress>> resolvedAddress;
try {
if (isSniProxy) {
@@ -311,11 +302,11 @@ public class ConnectionPool implements AutoCloseable {
resolvedAddress =
resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(),
proxyURI.getPort()));
} else {
- resolvedAddress = resolveName(unresolvedAddress);
+ resolvedAddress = resolveName(unresolvedPhysicalAddress);
}
return resolvedAddress.thenCompose(
- inetAddresses ->
connectToResolvedAddresses(inetAddresses.iterator(),
- isSniProxy ? unresolvedAddress : null));
+ inetAddresses ->
connectToResolvedAddresses(logicalAddress, inetAddresses.iterator(),
+ isSniProxy ? unresolvedPhysicalAddress : null));
} catch (URISyntaxException e) {
log.error("Invalid Proxy url {}",
clientConfig.getProxyServiceUrl(), e);
return FutureUtil
@@ -327,17 +318,19 @@ public class ConnectionPool implements AutoCloseable {
* Try to connect to a sequence of IP addresses until a successful
connection can be made, or fail if no
* address is working.
*/
- private CompletableFuture<Channel>
connectToResolvedAddresses(Iterator<InetSocketAddress> unresolvedAddresses,
+ private CompletableFuture<Channel>
connectToResolvedAddresses(InetSocketAddress logicalAddress,
+
Iterator<InetSocketAddress> resolvedPhysicalAddress,
InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// Successfully connected to server
- connectToAddress(unresolvedAddresses.next(), sniHost)
+ connectToAddress(logicalAddress, resolvedPhysicalAddress.next(),
sniHost)
.thenAccept(future::complete)
.exceptionally(exception -> {
- if (unresolvedAddresses.hasNext()) {
+ if (resolvedPhysicalAddress.hasNext()) {
// Try next IP address
- connectToResolvedAddresses(unresolvedAddresses,
sniHost).thenAccept(future::complete)
+ connectToResolvedAddresses(logicalAddress,
resolvedPhysicalAddress, sniHost)
+ .thenAccept(future::complete)
.exceptionally(ex -> {
// This is already unwinding the recursive
call
future.completeExceptionally(ex);
@@ -368,17 +361,22 @@ public class ConnectionPool implements AutoCloseable {
/**
* Attempt to establish a TCP connection to an already resolved single IP
address.
*/
- private CompletableFuture<Channel> connectToAddress(InetSocketAddress
remoteAddress, InetSocketAddress sniHost) {
+ private CompletableFuture<Channel> connectToAddress(InetSocketAddress
logicalAddress,
+ InetSocketAddress
physicalAddress, InetSocketAddress sniHost) {
if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
- .initTls(channel, sniHost != null ? sniHost :
remoteAddress))
+ .initTls(channel, sniHost != null ? sniHost :
physicalAddress))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
- .thenCompose(channel ->
toCompletableFuture(channel.connect(remoteAddress)));
+ .thenCompose(ch ->
+ channelInitializerHandler.initializeClientCnx(ch,
logicalAddress, physicalAddress))
+ .thenCompose(channel ->
toCompletableFuture(channel.connect(physicalAddress)));
} else {
return toCompletableFuture(bootstrap.register())
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
- .thenCompose(channel ->
toCompletableFuture(channel.connect(remoteAddress)));
+ .thenCompose(ch ->
+ channelInitializerHandler.initializeClientCnx(ch,
logicalAddress, physicalAddress))
+ .thenCompose(channel ->
toCompletableFuture(channel.connect(physicalAddress)));
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index f51b263277f..e01b53b8ef1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.SecurityUtility;
import
org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.netty.NettyFutureUtil;
@Slf4j
public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel> {
@@ -209,5 +210,29 @@ public class PulsarChannelInitializer extends
ChannelInitializer<SocketChannel>
return initSocks5Future;
}
+
+ CompletableFuture<Channel> initializeClientCnx(Channel ch,
+ InetSocketAddress
logicalAddress,
+ InetSocketAddress
resolvedPhysicalAddress) {
+ return NettyFutureUtil.toCompletableFuture(ch.eventLoop().submit(() ->
{
+ final ClientCnx cnx = (ClientCnx) ch.pipeline().get("handler");
+
+ if (cnx == null) {
+ throw new IllegalStateException("Missing ClientCnx. This
should not happen.");
+ }
+
+ // Need to do our own equality because the physical address is
resolved already
+ if
(!(logicalAddress.getHostString().equalsIgnoreCase(resolvedPhysicalAddress.getHostString())
+ && logicalAddress.getPort() ==
resolvedPhysicalAddress.getPort())) {
+ // We are connecting through a proxy. We need to set the
target broker in the ClientCnx object so that
+ // it can be specified when sending the CommandConnect.
+ cnx.setTargetBroker(logicalAddress);
+ }
+
+ cnx.setRemoteHostName(resolvedPhysicalAddress.getHostString());
+
+ return ch;
+ }));
+ }
}