This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6a605c8a127 [fix] [client] fix same producer/consumer use more than 
one connection per broker (#21144)
6a605c8a127 is described below

commit 6a605c8a127462257eec7e8bf21ad7f0c7b90bd9
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 11 21:06:05 2023 +0800

    [fix] [client] fix same producer/consumer use more than one connection per 
broker (#21144)
    
    Motivation: Pulsar has two mechanisms to guarantee that a producer connects 
to the broker multiple times the result is still correct.
    
    - In a connection, the second connection waits for the first connection to 
complete.
    - In a topic, the second connection will override the previous one.
    
    However, if a producer can use different connections to connect to the 
broker, these two mechanisms will not work.
    
    When the config `connectionsPerBroker` of `PulsarClient` is larger than 
`1`, a producer could use more than one connection, leading to the error above. 
You can reproduce this issue by the test `testSelectConnectionForSameProducer.`
    
    Modifications: Make the same producer/consumer usage the same connection
---
 .../buffer/impl/TransactionBufferHandlerImpl.java  |  8 +++-
 .../broker/service/AbstractReplicatorTest.java     |  4 ++
 .../pulsar/broker/service/PersistentTopicTest.java |  3 ++
 .../testcontext/NonStartableTestPulsarService.java |  7 +++-
 .../buffer/TransactionBufferClientTest.java        | 22 +++++++++--
 .../buffer/TransactionBufferHandlerImplTest.java   | 16 ++++++--
 .../pulsar/client/impl/ConnectionPoolTest.java     | 43 ++++++++++++++++++++--
 .../pulsar/client/impl/PulsarTestClient.java       |  2 +-
 .../apache/pulsar/compaction/CompactorTest.java    |  8 +++-
 .../pulsar/client/impl/ConnectionHandler.java      |  6 ++-
 .../apache/pulsar/client/impl/ConnectionPool.java  | 16 ++++++--
 .../pulsar/client/impl/PulsarClientImpl.java       | 19 ++++++++--
 .../impl/AcknowledgementsGroupingTrackerTest.java  |  2 +
 .../client/impl/AutoClusterFailoverTest.java       |  9 +++++
 .../client/impl/BatchMessageContainerImplTest.java |  4 ++
 .../pulsar/client/impl/ClientTestFixtures.java     | 11 +++++-
 .../client/impl/ConsumerBuilderImplTest.java       |  4 ++
 .../client/impl/ControlledClusterFailoverTest.java |  3 ++
 .../client/impl/PartitionedProducerImplTest.java   |  2 +
 .../client/impl/ProducerBuilderImplTest.java       |  2 +
 .../client/impl/ProducerStatsRecorderImplTest.java |  4 ++
 .../pulsar/client/impl/PulsarClientImplTest.java   |  3 +-
 .../client/impl/TableViewBuilderImplTest.java      |  2 +
 .../pulsar/client/impl/TableViewImplTest.java      |  2 +
 .../pulsar/client/impl/TopicListWatcherTest.java   | 10 ++++-
 .../client/impl/UnAckedMessageTrackerTest.java     |  4 ++
 .../MultiVersionSchemaInfoProviderTest.java        |  3 ++
 .../pulsar/functions/instance/ContextImplTest.java |  3 ++
 .../pulsar/functions/sink/PulsarSinkTest.java      |  4 ++
 .../pulsar/functions/source/PulsarSourceTest.java  |  5 +++
 .../pulsar/functions/worker/LeaderServiceTest.java |  4 +-
 .../functions/worker/MembershipManagerTest.java    |  4 +-
 32 files changed, 207 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 48dcf259edb..625d27329d3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -61,6 +61,8 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
     private final PulsarService pulsarService;
     private final PulsarClientImpl pulsarClient;
 
+    private final int randomKeyForSelectConnection;
+
     private static final 
AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER 
=
             
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, 
"requestCredits");
     private volatile int requestCredits;
@@ -74,6 +76,7 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
         this.operationTimeoutInMills = operationTimeoutInMills;
         this.timer = timer;
         this.requestCredits = Math.max(100, maxConcurrentRequests);
+        this.randomKeyForSelectConnection = 
pulsarClient.getCnxPool().genRandomKeyToSelectCon();
     }
 
     @Override
@@ -296,7 +299,7 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
     }
 
     public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
-        return pulsarClient.getConnection(topic);
+        return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
     }
 
     public CompletableFuture<ClientCnx> getClientCnx(String topic) {
@@ -317,7 +320,8 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
                                 }
                                 InetSocketAddress brokerAddress =
                                         
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
-                                return 
pulsarClient.getConnection(brokerAddress, brokerAddress);
+                                return 
pulsarClient.getConnection(brokerAddress, brokerAddress,
+                                        randomKeyForSelectConnection);
                             } else {
                                 // Bundle is unloading, lookup topic
                                 return getClientCnxWithLookup(topic);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 294a9b341ec..f8034c37971 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -62,8 +63,11 @@ public class AbstractReplicatorTest {
         final PulsarService pulsar = mock(PulsarService.class);
         final BrokerService broker = mock(BrokerService.class);
         final Topic localTopic = mock(Topic.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
         final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+        when(localClient.getCnxPool()).thenReturn(connectionPool);
         final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+        when(remoteClient.getCnxPool()).thenReturn(connectionPool);
         final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
         final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
         when(broker.executor()).thenReturn(eventLoopGroup);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 078208f7e44..71310fef810 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -105,6 +105,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -1713,6 +1714,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         ManagedCursor cursor = mock(ManagedCursorImpl.class);
         doReturn(remoteCluster).when(cursor).getName();
         PulsarClientImpl pulsarClientMock = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClientMock.getCnxPool()).thenReturn(connectionPool);
         when(pulsarClientMock.newProducer(any())).thenAnswer(
                 invocation -> {
                     ProducerBuilderImpl producerBuilder =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 2896f338e4a..2027bb33bf1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.testcontext;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import io.netty.channel.EventLoopGroup;
 import java.io.IOException;
 import java.util.Collections;
@@ -42,6 +43,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
@@ -77,7 +79,10 @@ class NonStartableTestPulsarService extends 
AbstractTestPulsarService {
             throw new RuntimeException(e);
         }
         
setSchemaRegistryService(spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class));
-        setClient(mock(PulsarClientImpl.class));
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
+        setClient(mockClient);
         this.namespaceService = mock(NamespaceService.class);
         try {
             startNamespaceService();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 2cfc9f46f0e..3873d9d37b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
@@ -51,6 +53,7 @@ import 
org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import 
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -253,14 +256,21 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
         assertEquals(pending.size(), 1);
     }
 
+    /**
+     * This is a flaky test.
+     */
     @Test
     public void testTransactionBufferClientTimeout() throws Exception {
         PulsarService pulsarService = pulsarServiceList.get(0);
-        PulsarClient mockClient = mock(PulsarClientImpl.class);
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
         CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
         
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
+        when(((PulsarClientImpl)mockClient).getConnection(anyString(), 
anyInt())).thenReturn(completableFuture);
+        when(((PulsarClientImpl)mockClient).getConnection(any(), any(), 
anyInt())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
@@ -287,7 +297,9 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
         ConcurrentSkipListMap<Long, Object> outstandingRequests =
                 (ConcurrentSkipListMap<Long, Object>) 
field.get(transactionBufferHandler);
 
-        assertEquals(outstandingRequests.size(), 1);
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(outstandingRequests.size(), 1);
+        });
 
         Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> {
             if (outstandingRequests.size() == 0) {
@@ -307,11 +319,13 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
     @Test
     public void testTransactionBufferChannelUnActive() throws 
PulsarServerException {
         PulsarService pulsarService = pulsarServiceList.get(0);
-        PulsarClient mockClient = mock(PulsarClientImpl.class);
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
         CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
-        
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
+        when(((PulsarClientImpl)mockClient).getConnection(anyString(), 
anyInt())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index d6ec092c445..278cdbac1f0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -24,6 +24,7 @@ import 
org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -31,8 +32,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -46,7 +47,9 @@ public class TransactionBufferHandlerImplTest {
 
     @Test
     public void testRequestCredits() throws PulsarServerException {
-        PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         PulsarService pulsarService = mock(PulsarService.class);
         NamespaceService namespaceService = mock(NamespaceService.class);
         when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
@@ -54,7 +57,10 @@ public class TransactionBufferHandlerImplTest {
         
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
         Optional<NamespaceEphemeralData> opData = Optional.empty();
         
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
-        
when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+        when(((PulsarClientImpl)pulsarClient).getConnection(anyString(), 
anyInt()))
+                
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+        when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
+                
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
         TransactionBufferHandlerImpl handler = spy(new 
TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
         doNothing().when(handler).endTxn(any());
         
doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString());
@@ -75,7 +81,9 @@ public class TransactionBufferHandlerImplTest {
 
     @Test
     public void testMinRequestCredits() throws PulsarServerException {
-        PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         PulsarService pulsarService = mock(PulsarService.class);
         when(pulsarService.getClient()).thenReturn(pulsarClient);
         TransactionBufferHandlerImpl handler = spy(new 
TransactionBufferHandlerImpl(pulsarService, null, 50, 3000));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index fe0aa4dd495..79ffada4a90 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -31,9 +31,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import io.netty.util.concurrent.Promise;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -80,6 +84,36 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
         eventLoop.shutdownGracefully();
     }
 
+    @Test
+    public void testSelectConnectionForSameProducer() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://sample/standalone/ns/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+        final CommandCloseProducer commandCloseProducer = new 
CommandCloseProducer();
+        // 10 connection per broker.
+        final PulsarClient clientWith10ConPerBroker = 
PulsarClient.builder().connectionsPerBroker(10)
+                        .serviceUrl(lookupUrl.toString()).build();
+        ProducerImpl producer = (ProducerImpl) 
clientWith10ConPerBroker.newProducer().topic(topicName).create();
+        commandCloseProducer.setProducerId(producer.producerId);
+        // An error will be reported when the Producer reconnects using a 
different connection.
+        // If no error is reported, the same connection was used when 
reconnecting.
+        for (int i = 0; i < 20; i++) {
+            // Trigger reconnect
+            ClientCnx cnx = producer.getClientCnx();
+            if (cnx != null) {
+                cnx.handleCloseProducer(commandCloseProducer);
+                Awaitility.await().untilAsserted(() ->
+                        Assert.assertEquals(producer.getState().toString(), 
HandlerState.State.Ready.toString(),
+                                "The producer uses a different connection when 
reconnecting")
+                );
+            }
+        }
+
+        // cleanup.
+        producer.close();
+        clientWith10ConPerBroker.close();
+        admin.topics().delete(topicName);
+    }
+
     @Test
     public void testDoubleIpAddress() throws Exception {
         ClientConfigurationData conf = new ClientConfigurationData();
@@ -205,20 +239,23 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
 
         ClientCnx cnx = pool.getConnection(
                 InetSocketAddress.createUnresolved("proxy", 9999),
-                InetSocketAddress.createUnresolved("proxy", 9999)).get();
+                InetSocketAddress.createUnresolved("proxy", 9999),
+                pool.genRandomKeyToSelectCon()).get();
         Assert.assertEquals(cnx.remoteHostName, "proxy");
         Assert.assertNull(cnx.proxyToTargetBrokerAddress);
 
         cnx = pool.getConnection(
                 InetSocketAddress.createUnresolved("broker1", 9999),
-                InetSocketAddress.createUnresolved("proxy", 9999)).get();
+                InetSocketAddress.createUnresolved("proxy", 9999),
+                pool.genRandomKeyToSelectCon()).get();
         Assert.assertEquals(cnx.remoteHostName, "proxy");
         Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker1:9999");
 
 
         cnx = pool.getConnection(
                 InetSocketAddress.createUnresolved("broker2", 9999),
-                InetSocketAddress.createUnresolved("broker2", 9999)).get();
+                InetSocketAddress.createUnresolved("broker2", 9999),
+                pool.genRandomKeyToSelectCon()).get();
         Assert.assertEquals(cnx.remoteHostName, "broker2");
         Assert.assertNull(cnx.proxyToTargetBrokerAddress);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index a725562ac40..6555c152bac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -122,7 +122,7 @@ public class PulsarTestClient extends PulsarClientImpl {
             result.completeExceptionally(new IOException("New connections are 
rejected."));
             return result;
         } else {
-            return super.getConnection(topic);
+            return super.getConnection(topic, 
getCnxPool().genRandomKeyToSelectCon());
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index d97d22ae7f1..4e442ac0513 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.compaction;
 
 import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -258,7 +261,10 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
     public void testPhaseOneLoopTimeConfiguration() {
         ServiceConfiguration configuration = new ServiceConfiguration();
         configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
-        TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, 
Mockito.mock(PulsarClientImpl.class),
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
+        TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, 
mockClient,
                 Mockito.mock(BookKeeper.class), compactionScheduler);
         Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 
60);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 263507dac1d..fc7c89c3ce6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -43,6 +43,7 @@ public class ConnectionHandler {
     private volatile long epoch = -1L;
     protected volatile long lastConnectionClosedTimestamp = 0L;
     private final AtomicBoolean duringConnect = new AtomicBoolean(false);
+    protected final int randomKeyForSelectConnection;
 
     interface Connection {
 
@@ -58,6 +59,7 @@ public class ConnectionHandler {
 
     protected ConnectionHandler(HandlerState state, Backoff backoff, 
Connection connection) {
         this.state = state;
+        this.randomKeyForSelectConnection = 
state.client.getCnxPool().genRandomKeyToSelectCon();
         this.connection = connection;
         this.backoff = backoff;
         CLIENT_CNX_UPDATER.set(this, null);
@@ -88,11 +90,11 @@ public class ConnectionHandler {
             if (state.redirectedClusterURI != null) {
                 InetSocketAddress address = 
InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
                         state.redirectedClusterURI.getPort());
-                cnxFuture = state.client.getConnection(address, address);
+                cnxFuture = state.client.getConnection(address, address, 
randomKeyForSelectConnection);
             } else if (state.topic == null) {
                 cnxFuture = state.client.getConnectionToServiceUrl();
             } else {
-                cnxFuture = state.client.getConnection(state.topic); //
+                cnxFuture = state.client.getConnection(state.topic, 
randomKeyForSelectConnection);
             }
             cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
                     .thenAccept(__ -> duringConnect.set(false))
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 1420d81c688..ef3a9249c82 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -165,8 +165,18 @@ public class ConnectionPool implements AutoCloseable {
 
     private static final Random random = new Random();
 
+    public int genRandomKeyToSelectCon() {
+        if (maxConnectionsPerHosts == 0) {
+            return -1;
+        }
+        return signSafeMod(random.nextInt(), maxConnectionsPerHosts);
+    }
+
     public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress 
address) {
-        return getConnection(address, address);
+        if (maxConnectionsPerHosts == 0) {
+            return getConnection(address, address, -1);
+        }
+        return getConnection(address, address, signSafeMod(random.nextInt(), 
maxConnectionsPerHosts));
     }
 
     void closeAllConnections() {
@@ -204,14 +214,12 @@ public class ConnectionPool implements AutoCloseable {
      * @return a future that will produce the ClientCnx object
      */
     public CompletableFuture<ClientCnx> getConnection(InetSocketAddress 
logicalAddress,
-            InetSocketAddress physicalAddress) {
+            InetSocketAddress physicalAddress, final int randomKey) {
         if (maxConnectionsPerHosts == 0) {
             // Disable pooling
             return createConnection(logicalAddress, physicalAddress, -1);
         }
 
-        final int randomKey = signSafeMod(random.nextInt(), 
maxConnectionsPerHosts);
-
         final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
                 pool.computeIfAbsent(logicalAddress, a -> new 
ConcurrentHashMap<>());
         CompletableFuture<ClientCnx> completableFuture = innerPool
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6c749a8cf43..fdabb5fa8cf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -944,10 +944,20 @@ public class PulsarClientImpl implements PulsarClient {
         conf.setTlsTrustStorePassword(tlsTrustStorePassword);
     }
 
+    public CompletableFuture<ClientCnx> getConnection(final String topic, int 
randomKeyForSelectConnection) {
+        TopicName topicName = TopicName.get(topic);
+        return lookup.getBroker(topicName)
+                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight(), randomKeyForSelectConnection));
+    }
+
+    /**
+     * Only for test.
+     */
+    @VisibleForTesting
     public CompletableFuture<ClientCnx> getConnection(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return lookup.getBroker(topicName)
-                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight()));
+                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
     }
 
     public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
@@ -956,12 +966,13 @@ public class PulsarClientImpl implements PulsarClient {
                     "Can't get client connection to HTTP service URL", null));
         }
         InetSocketAddress address = lookup.resolveHost();
-        return getConnection(address, address);
+        return getConnection(address, address, 
cnxPool.genRandomKeyToSelectCon());
     }
 
     public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress 
logicalAddress,
-                                                      final InetSocketAddress 
physicalAddress) {
-        return cnxPool.getConnection(logicalAddress, physicalAddress);
+                                                      final InetSocketAddress 
physicalAddress,
+                                                      final int 
randomKeyForSelectConnection) {
+        return cnxPool.getConnection(logicalAddress, physicalAddress, 
randomKeyForSelectConnection);
     }
 
     /** visible for pulsar-functions. **/
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 0418a54c772..1d1a6f85bfd 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -65,6 +65,8 @@ public class AcknowledgementsGroupingTrackerTest {
                 ConcurrentOpenHashMap.<MessageIdAdv, 
MessageIdImpl[]>newBuilder().build();
         cnx = spy(new ClientCnxTest(new ClientConfigurationData(), 
eventLoopGroup));
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         doReturn(client).when(consumer).getClient();
         doReturn(cnx).when(consumer).getClientCnx();
         doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index 36ffa30296b..63fbb239439 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -121,6 +122,8 @@ public class AutoClusterFailoverTest {
 
         AutoClusterFailover autoClusterFailover = 
Mockito.spy((AutoClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
         
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -163,6 +166,8 @@ public class AutoClusterFailoverTest {
 
         AutoClusterFailover autoClusterFailover = 
Mockito.spy((AutoClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
         
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -217,6 +222,8 @@ public class AutoClusterFailoverTest {
 
         AutoClusterFailover autoClusterFailover = 
Mockito.spy((AutoClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
         
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -270,6 +277,8 @@ public class AutoClusterFailoverTest {
 
         AutoClusterFailover autoClusterFailover = 
Mockito.spy((AutoClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
         
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 4b80e19c256..abb195c9830 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -105,6 +105,8 @@ public class BatchMessageContainerImplTest {
         final ProducerConfigurationData producerConfigurationData = new 
ProducerConfigurationData();
         producerConfigurationData.setCompressionType(CompressionType.NONE);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         MemoryLimitController memoryLimitController = 
mock(MemoryLimitController.class);
         
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
         try {
@@ -148,6 +150,8 @@ public class BatchMessageContainerImplTest {
         final ProducerConfigurationData producerConfigurationData = new 
ProducerConfigurationData();
         producerConfigurationData.setCompressionType(CompressionType.NONE);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         MemoryLimitController memoryLimitController = 
mock(MemoryLimitController.class);
         
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
         try {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 4db5dbe8776..ff7d7f12dd4 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import io.netty.channel.ChannelHandlerContext;
@@ -72,11 +74,16 @@ class ClientTestFixtures {
                 
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
         
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
         
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
-        when(clientMock.getConnection(any(), 
any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        
when(clientMock.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        when(clientMock.getConnection(anyString(), anyInt()))
+                .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        when(clientMock.getConnection(any(), any(), anyInt()))
+                .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
         ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
         when(clientMock.getCnxPool()).thenReturn(connectionPoolMock);
         
when(connectionPoolMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
-        when(connectionPoolMock.getConnection(any(), 
any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        when(connectionPoolMock.getConnection(any(), any(), anyInt()))
+                .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
         return clientMock;
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 8dbd23f9c29..3fe13663046 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -76,6 +76,8 @@ public class ConsumerBuilderImplTest {
     @BeforeMethod(alwaysRun = true)
     public void setup() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         ConsumerConfigurationData consumerConfigurationData = 
mock(ConsumerConfigurationData.class);
         
when(consumerConfigurationData.getTopicsPattern()).thenReturn(Pattern.compile("\\w+"));
         
when(consumerConfigurationData.getSubscriptionName()).thenReturn("testSubscriptionName");
@@ -104,6 +106,8 @@ public class ConsumerBuilderImplTest {
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testConsumerBuilderImplWhenSchemaIsNull() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         ConsumerConfigurationData consumerConfigurationData = 
mock(ConsumerConfigurationData.class);
         new ConsumerBuilderImpl(client, consumerConfigurationData, null);
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index 570b1398328..227e0db10b7 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -31,6 +31,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @Test(groups = "broker-impl")
 public class ControlledClusterFailoverTest {
@@ -88,6 +89,8 @@ public class ControlledClusterFailoverTest {
 
         ControlledClusterFailover controlledClusterFailover = 
Mockito.spy((ControlledClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
 
         controlledClusterFailover.initialize(pulsarClient);
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index 223881d85a8..2bd18f69386 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -70,6 +70,8 @@ public class PartitionedProducerImplTest {
     @BeforeMethod(alwaysRun = true)
     public void setup() {
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         schema = mock(Schema.class);
         producerInterceptors = mock(ProducerInterceptors.class);
         producerCreatedFuture = new CompletableFuture<>();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index bb3e3fc3acc..b830d375303 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -52,6 +52,8 @@ public class ProducerBuilderImplTest {
     public void setup() {
         Producer<?> producer = mock(Producer.class);
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
         when(client.newProducer()).thenReturn(producerBuilderImpl);
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index 32d0eff6e79..27e2dcb37ce 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -40,6 +40,8 @@ public class ProducerStatsRecorderImplTest {
         ClientConfigurationData conf = new ClientConfigurationData();
         conf.setStatsIntervalSeconds(1);
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.getConfiguration()).thenReturn(conf);
         Timer timer = new HashedWheelTimer();
         when(client.timer()).thenReturn(timer);
@@ -60,6 +62,8 @@ public class ProducerStatsRecorderImplTest {
         ClientConfigurationData conf = new ClientConfigurationData();
         conf.setStatsIntervalSeconds(60);
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.getConfiguration()).thenReturn(conf);
         Timer timer = new HashedWheelTimer();
         when(client.timer()).thenReturn(timer);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 54d13538d78..e0b25db8912 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.mock;
@@ -122,7 +123,7 @@ public class PulsarClientImplTest {
         when(cnx.ctx()).thenReturn(ctx);
         when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong()))
                 
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
-        when(pool.getConnection(any(InetSocketAddress.class), 
any(InetSocketAddress.class)))
+        when(pool.getConnection(any(InetSocketAddress.class), 
any(InetSocketAddress.class), anyInt()))
                 .thenReturn(CompletableFuture.completedFuture(cnx));
 
         ClientConfigurationData conf = new ClientConfigurationData();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
index 9959a203855..eee8ba4e8f4 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
@@ -52,6 +52,8 @@ public class TableViewBuilderImplTest {
         Reader reader = mock(Reader.class);
         when(reader.readNextAsync()).thenReturn(CompletableFuture.allOf());
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.newReader(any(Schema.class)))
             .thenReturn(new ReaderBuilderImpl(client, Schema.BYTES));
         when(client.createReaderAsync(any(ReaderConfigurationData.class), 
any(Schema.class)))
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java
index 68c886bc721..6a866034ddb 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java
@@ -38,6 +38,8 @@ public class TableViewImplTest {
     @BeforeClass(alwaysRun = true)
     public void setup() {
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.newReader(any(Schema.class)))
             .thenReturn(new ReaderBuilderImpl(client, Schema.BYTES));
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index e462bb4d62c..1b39448fbe7 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -28,7 +28,9 @@ import 
org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.naming.NamespaceName;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -52,6 +54,9 @@ public class TopicListWatcherTest {
     public void setup() {
         listener = mock(TopicsChangedListener.class);
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
+        when(connectionPool.genRandomKeyToSelectCon()).thenReturn(0);
         when(client.getConfiguration()).thenReturn(new 
ClientConfigurationData());
         clientCnxFuture = new CompletableFuture<>();
         when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture);
@@ -59,6 +64,9 @@ public class TopicListWatcherTest {
         when(client.timer()).thenReturn(timer);
         String topic = "persistent://tenant/ns/topic\\d+";
         when(client.getConnection(topic)).thenReturn(clientCnxFuture);
+        when(client.getConnection(topic, 0)).thenReturn(clientCnxFuture);
+        when(client.getConnection(any(), any(), 
anyInt())).thenReturn(clientCnxFuture);
+        when(connectionPool.getConnection(any(), any(), 
anyInt())).thenReturn(clientCnxFuture);
         watcherFuture = new CompletableFuture<>();
         watcher = new TopicListWatcher(listener, client,
                 Pattern.compile(topic), 7,
@@ -67,7 +75,7 @@ public class TopicListWatcherTest {
 
     @Test
     public void testWatcherGrabsConnection() {
-        verify(client).getConnection(any());
+        verify(client).getConnection(anyString(), anyInt());
     }
 
     @Test
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
index f6c668703d9..91ad3210482 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
@@ -45,6 +45,8 @@ public class UnAckedMessageTrackerTest  {
     @Test
     public void testAddAndRemove() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
                 1, TimeUnit.MILLISECONDS);
         when(client.timer()).thenReturn(timer);
@@ -83,6 +85,8 @@ public class UnAckedMessageTrackerTest  {
     @Test
     public void testTrackChunkedMessageId() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
                 1, TimeUnit.MILLISECONDS);
         when(client.timer()).thenReturn(timer);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index 8959e670234..bfd6af37e3e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -27,6 +27,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -46,6 +47,8 @@ public class MultiVersionSchemaInfoProviderTest {
     @BeforeMethod
     public void setup() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.getLookup()).thenReturn(mock(LookupService.class));
         schemaProvider = new MultiVersionSchemaInfoProvider(
                 TopicName.get("persistent://public/default/my-topic"), client);
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index e0ebb52da74..90f7df37fa1 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.ProducerBase;
@@ -99,6 +100,8 @@ public class ContextImplTest {
 
         producer = mock(Producer.class);
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, 
Schema.BYTES));
         when(client.createProducerAsync(any(ProducerConfigurationData.class), 
any(), any()))
                 .thenReturn(CompletableFuture.completedFuture(producer));
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index fdac39512cc..799bad839a4 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
@@ -56,6 +57,7 @@ import 
org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -95,6 +97,8 @@ public class PulsarSinkTest {
      */
     private static PulsarClientImpl getPulsarClient() throws 
PulsarClientException {
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
         doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
         
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 91e4c06fe5b..5d6e4a3dc75 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.functions.source;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertSame;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
@@ -44,6 +46,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.ConsumerConfig;
@@ -105,6 +108,8 @@ public class PulsarSourceTest {
      */
     private static PulsarClientImpl getPulsarClient() throws 
PulsarClientException {
         PulsarClientImpl pulsarClient = Mockito.mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         ConsumerBuilder<?> goodConsumerBuilder = 
Mockito.mock(ConsumerBuilder.class);
         ConsumerBuilder<?> badConsumerBuilder = 
Mockito.mock(ConsumerBuilder.class);
         Mockito.doReturn(goodConsumerBuilder).when(goodConsumerBuilder)
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
index 8da24fd1b72..5c10a59bd13 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -76,7 +77,8 @@ public class LeaderServiceTest {
     @BeforeMethod
     public void setup() throws PulsarClientException {
         mockClient = mock(PulsarClientImpl.class);
-
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
         mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index e066bb24e6e..ac3176b3135 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.WorkerInfo;
@@ -68,7 +69,8 @@ public class MembershipManagerTest {
 
     private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
-
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
         ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
 

Reply via email to