michaeljmarshall commented on issue #13923:
URL: https://github.com/apache/pulsar/issues/13923#issuecomment-1404366610
I am pretty confident I found the root cause. If you update the
`ConnectionPool` to use the following logic, the ProxyTest class fails with the
same errors:
```
createConnection(physicalAddress).thenAcceptAsync(channel -> {
log.info("[{}] Connected to server", channel);
channel.closeFuture().addListener(v -> {
// Remove connection from pool when it gets closed
if (log.isDebugEnabled()) {
log.debug("Removing closed connection from pool: {}", v);
}
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
});
// We are connected to broker, but need to wait until the
connect/connected handshake is
// complete
final ClientCnx cnx = (ClientCnx)
channel.pipeline().get("handler");
if (!channel.isActive() || cnx == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection was already closed by the
time we got notified", channel);
}
cnxFuture.completeExceptionally(new
ChannelException("Connection already closed"));
return;
}
if (!logicalAddress.equals(physicalAddress)) {
// We are connecting through a proxy. We need to set the
target broker in the ClientCnx object so that
// it can be specified when sending the CommandConnect.
// That phase will happen in the
ClientCnx.connectionActive() which will be invoked immediately after
// this method.
cnx.setTargetBroker(logicalAddress);
}
cnx.setRemoteHostName(physicalAddress.getHostString());
cnx.connectionFuture().thenRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection handshake completed",
cnx.channel());
}
cnxFuture.complete(cnx);
}).exceptionally(exception -> {
log.warn("[{}] Connection handshake failed: {}",
cnx.channel(), exception.getMessage());
cnxFuture.completeExceptionally(exception);
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
cnx.ctx().close();
return null;
});
}, ForkJoinPool.commonPool())
```
The one detail I haven't found is _why_ the callback is sometimes run late.
Note that this explanation aligns closely with the earlier observation that
we didn't see the client log about connecting to the broker. The log would have
come from here:
https://github.com/apache/pulsar/blob/f3608074f537471545926a84df8a1d061b30692c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L264-L270
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]