racorn commented on pull request #8062:
URL: https://github.com/apache/pulsar/pull/8062#issuecomment-693001175
@rdhabalia Thank you were much! It looks good, my only concern is that now
`private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int
port, InetSocketAddress sniHost)` does synchronous channel initialization.
To do asynchronous channel initialization, this is a possible attempt:
```
private CompletableFuture<Channel> connectToAddress(final InetAddress
ipAddress, final int port,
final InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// if proxy is configured in pulsar-client then make it thread-safe
while updating
// channelInitializerHandler
if (isSniProxy) {
bootstrap.register().addListener((ChannelFutureListener) cf -> {
if (cf.isSuccess()) {
try {
channelInitializerHandler.initChannel(cf.channel(),
sniHost);
cf.channel().connect(new
InetSocketAddress(ipAddress, port))
.addListener((ChannelFutureListener)
channelFuture -> {
if (channelFuture.isSuccess()) {
future.complete(channelFuture.channel());
} else {
future.completeExceptionally(channelFuture.cause());
}
});
} catch (Exception e) {
log.warn("Failed to initialize channel with {}, {}",
ipAddress, sniHost, e);
future.completeExceptionally(e);
}
} else {
future.completeExceptionally(cf.cause());
}
});
} else {
bootstrap.connect(ipAddress,
port).addListener((ChannelFutureListener) cf -> {
if (cf.isSuccess()) {
future.complete(cf.channel());
} else {
future.completeExceptionally(cf.cause());
}
});
}
return future;
}
```
Here, the `PulsarChannelInitializer.initChannel(cf.channel(), sniHost)` is
run in the callback thread, I assume it will be the Channel event loop thread.
If that is not good enough, one could take it a step further and make
`PulsarChannelInitializer.initChannel(cf.channel(), sniHost)` private, then add
a new method to `PulsarChannelInitializer`:
```
public CompletableFuture<Channel> initSniSsl(final Channel ch, final
InetSocketAddress sniHost) {
if (!isSniProxyEnabled) {
throw new IllegalStateException("This method can only be invoked
when SniProxy is enabled");
}
CompletableFuture<Channel> initFuture = new CompletableFuture<>();
ch.eventLoop().execute(() -> {
try {
initChannel(ch, sniHost);
initFuture.complete(ch);
} catch (Throwable t) {
initFuture.completeExceptionally(t);
}
});
return initFuture;
}
```
Then update ConnectionPool:
```
private CompletableFuture<Channel> connectToAddress(final InetAddress
ipAddress, final int port,
final InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// if proxy is configured in pulsar-client then make it thread-safe
while updating
// channelInitializerHandler
if (isSniProxy) {
bootstrap.register().addListener((ChannelFutureListener) cf -> {
if (cf.isSuccess()) {
try {
initSniSsl(cf.channel(), sniHost)
.whenComplete((channel, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
channel.connect(new
InetSocketAddress(ipAddress, port))
.addListener((ChannelFutureListener) channelFuture -> {
if
(channelFuture.isSuccess()) {
future.complete(channelFuture.channel());
} else {
future.completeExceptionally(cf.cause());
}
});
}
});
} catch (Throwable t) {
future.completeExceptionally(t);
}
} else {
future.completeExceptionally(cf.cause());
}
});
} else {
bootstrap.connect(ipAddress,
port).addListener((ChannelFutureListener) cf -> {
if (cf.isSuccess()) {
future.complete(cf.channel());
} else {
future.completeExceptionally(cf.cause());
}
});
}
return future;
}
```
A bit complicated when having to deal with both ChannelFutures and
CompletableFutures :-)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]