This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 0a4639e4bd7 [fix][client] Fix race condition that leads to caching 
failed CompletableFutures in ConnectionPool (#19661)
0a4639e4bd7 is described below

commit 0a4639e4bd7af875603382509166e51f51f925b3
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue Feb 28 12:17:33 2023 +0100

    [fix][client] Fix race condition that leads to caching failed 
CompletableFutures in ConnectionPool (#19661)
    
    (cherry picked from commit 69fb3c2ca3faa32ff12fd1270730b3517ea69220)
---
 .../org/apache/pulsar/client/impl/ConnectionPool.java   | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 0071aa91fd2..659b54fe747 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -174,8 +174,19 @@ public class ConnectionPool implements AutoCloseable {
 
         final int randomKey = signSafeMod(random.nextInt(), 
maxConnectionsPerHosts);
 
-        return pool.computeIfAbsent(logicalAddress, a -> new 
ConcurrentHashMap<>()) //
+        final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
+                pool.computeIfAbsent(logicalAddress, a -> new 
ConcurrentHashMap<>());
+        CompletableFuture<ClientCnx> completableFuture = innerPool
                 .computeIfAbsent(randomKey, k -> 
createConnection(logicalAddress, physicalAddress, randomKey));
+        if (completableFuture.isCompletedExceptionally()) {
+            // we cannot cache a failed connection, so we remove it from the 
pool
+            // there is a race condition in which
+            // cleanupConnection is called before caching this result
+            // and so the clean up fails
+            cleanupConnection(logicalAddress, randomKey, completableFuture);
+        }
+
+        return completableFuture;
     }
 
     private CompletableFuture<ClientCnx> createConnection(InetSocketAddress 
logicalAddress,
@@ -217,6 +228,10 @@ public class ConnectionPool implements AutoCloseable {
             }).exceptionally(exception -> {
                 log.warn("[{}] Connection handshake failed: {}", 
cnx.channel(), exception.getMessage());
                 cnxFuture.completeExceptionally(exception);
+                // this cleanupConnection may happen before that the
+                // CompletableFuture is cached into the "pool" map,
+                // it is not enough to clean it here, we need to clean it
+                // in the "pool" map when the CompletableFuture is cached
                 cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                 cnx.ctx().close();
                 return null;

Reply via email to