racorn commented on pull request #8062:
URL: https://github.com/apache/pulsar/pull/8062#issuecomment-693001175


   @rdhabalia Thank you were much! It looks good, my only concern is that now 
`private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int 
port, InetSocketAddress sniHost)` does synchronous channel initialization.
   
   To do asynchronous channel initialization, this is a possible attempt:
    ```
   private CompletableFuture<Channel> connectToAddress(final InetAddress 
ipAddress, final int port,
               final InetSocketAddress sniHost) {
           CompletableFuture<Channel> future = new CompletableFuture<>();
           // if proxy is configured in pulsar-client then make it thread-safe 
while updating
           // channelInitializerHandler
           if (isSniProxy) {
               bootstrap.register().addListener((ChannelFutureListener) cf -> {
                   if (cf.isSuccess()) {
                       try {
                           channelInitializerHandler.initChannel(cf.channel(), 
sniHost);
                           cf.channel().connect(new 
InetSocketAddress(ipAddress, port))
                                   .addListener((ChannelFutureListener) 
channelFuture -> {
                                       if (channelFuture.isSuccess()) {
                                           
future.complete(channelFuture.channel());
                                       } else {
                                           
future.completeExceptionally(channelFuture.cause());
                                       }
                                   });
                       } catch (Exception e) {
                           log.warn("Failed to initialize channel with {}, {}", 
ipAddress, sniHost, e);
                           future.completeExceptionally(e);
                       }
                   } else {
                       future.completeExceptionally(cf.cause());
                   }
               });
           } else {
               bootstrap.connect(ipAddress, 
port).addListener((ChannelFutureListener) cf -> {
                   if (cf.isSuccess()) {
                       future.complete(cf.channel());
                   } else {
                       future.completeExceptionally(cf.cause());
                   }
               });
           }
           return future;
       }
   
   ```
   Here, the `PulsarChannelInitializer.initChannel(cf.channel(), sniHost)` is 
run in the callback thread, I assume it will be the Channel event loop thread. 
If that is not good enough, one could take it a step further and make 
`PulsarChannelInitializer.initChannel(cf.channel(), sniHost)` private, then add 
a new method to `PulsarChannelInitializer`:
   
   ```
       public CompletableFuture<Channel> initSniSsl(final Channel ch, final 
InetSocketAddress sniHost) {
           if (!isSniProxyEnabled) {
               throw new IllegalStateException("This method can only be invoked 
when SniProxy is enabled");
           }
           CompletableFuture<Channel> initFuture = new CompletableFuture<>();
           ch.eventLoop().execute(() -> {
               try {
                   initChannel(ch, sniHost);
                   initFuture.complete(ch);
               } catch (Throwable t) {
                   initFuture.completeExceptionally(t);
               }
           });
           return initFuture;
       }
   ```
   
   Then update ConnectionPool:
   ```
       private CompletableFuture<Channel> connectToAddress(final InetAddress 
ipAddress, final int port,
               final InetSocketAddress sniHost) {
           CompletableFuture<Channel> future = new CompletableFuture<>();
           // if proxy is configured in pulsar-client then make it thread-safe 
while updating
           // channelInitializerHandler
           if (isSniProxy) {
               bootstrap.register().addListener((ChannelFutureListener) cf -> {
                   if (cf.isSuccess()) {
                       try {
                           initSniSsl(cf.channel(), sniHost)
                                   .whenComplete((channel, error) -> {
                                       if (error != null) {
                                           future.completeExceptionally(error);
                                       } else {
                                           channel.connect(new 
InetSocketAddress(ipAddress, port))
                                                   
.addListener((ChannelFutureListener) channelFuture -> {
                                                       if 
(channelFuture.isSuccess()) {
                                                           
future.complete(channelFuture.channel());
                                                       } else {
                                                           
future.completeExceptionally(cf.cause());
                                                       }
                                                   });
                                       }
                                   });
                       } catch (Throwable t) {
                           future.completeExceptionally(t);
                       }
                   } else {
                       future.completeExceptionally(cf.cause());
                   }
               });
           } else {
               bootstrap.connect(ipAddress, 
port).addListener((ChannelFutureListener) cf -> {
                   if (cf.isSuccess()) {
                       future.complete(cf.channel());
                   } else {
                       future.completeExceptionally(cf.cause());
                   }
               });
           }
           return future;
       }
   ```
   
   A bit complicated when having to deal with both ChannelFutures and 
CompletableFutures :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to