This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 69fb3c2ca3f [fix][client] Fix race condition that leads to caching
failed CompletableFutures in ConnectionPool (#19661)
69fb3c2ca3f is described below
commit 69fb3c2ca3faa32ff12fd1270730b3517ea69220
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue Feb 28 12:17:33 2023 +0100
[fix][client] Fix race condition that leads to caching failed
CompletableFutures in ConnectionPool (#19661)
---
.../java/org/apache/pulsar/client/impl/ConnectionPool.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 3a9a2b9b7ab..1420d81c688 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -216,6 +216,15 @@ public class ConnectionPool implements AutoCloseable {
pool.computeIfAbsent(logicalAddress, a -> new
ConcurrentHashMap<>());
CompletableFuture<ClientCnx> completableFuture = innerPool
.computeIfAbsent(randomKey, k ->
createConnection(logicalAddress, physicalAddress, randomKey));
+ if (completableFuture.isCompletedExceptionally()) {
+ // we cannot cache a failed connection, so we remove it from the
pool
+ // there is a race condition in which
+ // cleanupConnection is called before caching this result
+ // and so the clean up fails
+ cleanupConnection(logicalAddress, randomKey, completableFuture);
+ return completableFuture;
+ }
+
return completableFuture.thenCompose(clientCnx -> {
// If connection already release, create a new one.
if (clientCnx.getIdleState().isReleased()) {
@@ -274,6 +283,10 @@ public class ConnectionPool implements AutoCloseable {
}).exceptionally(exception -> {
log.warn("[{}] Connection handshake failed: {}",
cnx.channel(), exception.getMessage());
cnxFuture.completeExceptionally(exception);
+ // this cleanupConnection may happen before that the
+ // CompletableFuture is cached into the "pool" map,
+ // it is not enough to clean it here, we need to clean it
+ // in the "pool" map when the CompletableFuture is cached
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
cnx.ctx().close();
return null;