michaeljmarshall commented on issue #13923:
URL: https://github.com/apache/pulsar/issues/13923#issuecomment-1404366610

   I am pretty confident I found the root cause. If you update the 
`ConnectionPool` to use the following logic, the ProxyTest class fails with the 
same errors:
   
   ```
   createConnection(physicalAddress).thenAcceptAsync(channel -> {
               log.info("[{}] Connected to server", channel);
   
               channel.closeFuture().addListener(v -> {
                   // Remove connection from pool when it gets closed
                   if (log.isDebugEnabled()) {
                       log.debug("Removing closed connection from pool: {}", v);
                   }
                   cleanupConnection(logicalAddress, connectionKey, cnxFuture);
               });
   
               // We are connected to broker, but need to wait until the 
connect/connected handshake is
               // complete
               final ClientCnx cnx = (ClientCnx) 
channel.pipeline().get("handler");
               if (!channel.isActive() || cnx == null) {
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] Connection was already closed by the 
time we got notified", channel);
                   }
                   cnxFuture.completeExceptionally(new 
ChannelException("Connection already closed"));
                   return;
               }
   
               if (!logicalAddress.equals(physicalAddress)) {
                   // We are connecting through a proxy. We need to set the 
target broker in the ClientCnx object so that
                   // it can be specified when sending the CommandConnect.
                   // That phase will happen in the 
ClientCnx.connectionActive() which will be invoked immediately after
                   // this method.
                   cnx.setTargetBroker(logicalAddress);
               }
   
               cnx.setRemoteHostName(physicalAddress.getHostString());
   
               cnx.connectionFuture().thenRun(() -> {
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] Connection handshake completed", 
cnx.channel());
                   }
                   cnxFuture.complete(cnx);
               }).exceptionally(exception -> {
                   log.warn("[{}] Connection handshake failed: {}", 
cnx.channel(), exception.getMessage());
                   cnxFuture.completeExceptionally(exception);
                   cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                   cnx.ctx().close();
                   return null;
               });
           }, ForkJoinPool.commonPool())
   ```
   
   The one detail I haven't found is _why_ the callback is sometimes run late.
   
   Note that this explanation aligns closely with the earlier observation that 
we didn't see the client log about connecting to the broker. The log would have 
come from here: 
   
   
https://github.com/apache/pulsar/blob/f3608074f537471545926a84df8a1d061b30692c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L264-L270


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to