This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 0a4639e4bd7 [fix][client] Fix race condition that leads to caching
failed CompletableFutures in ConnectionPool (#19661)
0a4639e4bd7 is described below
commit 0a4639e4bd7af875603382509166e51f51f925b3
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue Feb 28 12:17:33 2023 +0100
[fix][client] Fix race condition that leads to caching failed
CompletableFutures in ConnectionPool (#19661)
(cherry picked from commit 69fb3c2ca3faa32ff12fd1270730b3517ea69220)
---
.../org/apache/pulsar/client/impl/ConnectionPool.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 0071aa91fd2..659b54fe747 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -174,8 +174,19 @@ public class ConnectionPool implements AutoCloseable {
final int randomKey = signSafeMod(random.nextInt(),
maxConnectionsPerHosts);
- return pool.computeIfAbsent(logicalAddress, a -> new
ConcurrentHashMap<>()) //
+ final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
+ pool.computeIfAbsent(logicalAddress, a -> new
ConcurrentHashMap<>());
+ CompletableFuture<ClientCnx> completableFuture = innerPool
.computeIfAbsent(randomKey, k ->
createConnection(logicalAddress, physicalAddress, randomKey));
+ if (completableFuture.isCompletedExceptionally()) {
+ // we cannot cache a failed connection, so we remove it from the
pool
+ // there is a race condition in which
+ // cleanupConnection is called before caching this result
+ // and so the clean up fails
+ cleanupConnection(logicalAddress, randomKey, completableFuture);
+ }
+
+ return completableFuture;
}
private CompletableFuture<ClientCnx> createConnection(InetSocketAddress
logicalAddress,
@@ -217,6 +228,10 @@ public class ConnectionPool implements AutoCloseable {
}).exceptionally(exception -> {
log.warn("[{}] Connection handshake failed: {}",
cnx.channel(), exception.getMessage());
cnxFuture.completeExceptionally(exception);
+ // this cleanupConnection may happen before that the
+ // CompletableFuture is cached into the "pool" map,
+ // it is not enough to clean it here, we need to clean it
+ // in the "pool" map when the CompletableFuture is cached
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
cnx.ctx().close();
return null;