This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 7b4f82b0222 [fix] [client] fix same producer/consumer use more than 
one connection per broker (#21144)
7b4f82b0222 is described below

commit 7b4f82b02224d5758552da2d3860ed19b31ef811
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 11 21:06:05 2023 +0800

    [fix] [client] fix same producer/consumer use more than one connection per 
broker (#21144)
    
    Motivation: Pulsar has two mechanisms to guarantee that a producer connects 
to the broker multiple times the result is still correct.
    
    - In a connection, the second connection waits for the first connection to 
complete.
    - In a topic, the second connection will override the previous one.
    
    However, if a producer can use different connections to connect to the 
broker, these two mechanisms will not work.
    
    When the config `connectionsPerBroker` of `PulsarClient` is larger than 
`1`, a producer could use more than one connection, leading to the error above. 
You can reproduce this issue by the test `testSelectConnectionForSameProducer.`
    
    Modifications: Make the same producer/consumer usage the same connection
    (cherry picked from commit f2b9a3fffd4aad414ba5f35ebff17502cb64eb12)
---
 .../buffer/impl/TransactionBufferHandlerImpl.java  |  8 +++-
 .../broker/service/AbstractReplicatorTest.java     |  4 ++
 .../buffer/TransactionBufferClientTest.java        | 19 ++++++++--
 .../buffer/TransactionBufferHandlerImplTest.java   | 16 ++++++--
 .../pulsar/client/impl/ConnectionPoolTest.java     | 43 ++++++++++++++++++++--
 .../pulsar/client/impl/PulsarTestClient.java       |  2 +-
 .../apache/pulsar/compaction/CompactorTest.java    |  8 +++-
 .../pulsar/client/impl/ConnectionHandler.java      |  4 +-
 .../apache/pulsar/client/impl/ConnectionPool.java  | 16 ++++++--
 .../pulsar/client/impl/PulsarClientImpl.java       | 17 +++++++--
 .../impl/AcknowledgementsGroupingTrackerTest.java  |  2 +
 .../client/impl/AutoClusterFailoverTest.java       | 19 +++++++---
 .../client/impl/BatchMessageContainerImplTest.java |  2 +
 .../client/impl/ConsumerBuilderImplTest.java       |  2 +
 .../client/impl/ControlledClusterFailoverTest.java |  3 ++
 .../client/impl/PartitionedProducerImplTest.java   |  2 +
 .../client/impl/ProducerBuilderImplTest.java       |  2 +
 .../client/impl/ProducerStatsRecorderImplTest.java |  4 ++
 .../pulsar/client/impl/PulsarClientImplTest.java   |  3 +-
 .../client/impl/UnAckedMessageTrackerTest.java     |  2 +
 .../MultiVersionSchemaInfoProviderTest.java        |  3 ++
 .../pulsar/functions/instance/ContextImplTest.java |  3 ++
 .../pulsar/functions/sink/PulsarSinkTest.java      |  4 ++
 .../pulsar/functions/source/PulsarSourceTest.java  |  7 +++-
 .../pulsar/functions/worker/LeaderServiceTest.java |  4 +-
 .../functions/worker/MembershipManagerTest.java    |  4 +-
 26 files changed, 171 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 3f9a083787b..a0bfef3696a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -61,6 +61,8 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
     private final PulsarService pulsarService;
     private final PulsarClientImpl pulsarClient;
 
+    private final int randomKeyForSelectConnection;
+
     private static final 
AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER 
=
             
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, 
"requestCredits");
     private volatile int requestCredits;
@@ -74,6 +76,7 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
         this.operationTimeoutInMills = operationTimeoutInMills;
         this.timer = timer;
         this.requestCredits = Math.max(100, maxConcurrentRequests);
+        this.randomKeyForSelectConnection = 
pulsarClient.getCnxPool().genRandomKeyToSelectCon();
     }
 
     @Override
@@ -296,7 +299,7 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
     }
 
     public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
-        return pulsarClient.getConnection(topic);
+        return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
     }
 
     public CompletableFuture<ClientCnx> getClientCnx(String topic) {
@@ -317,7 +320,8 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
                                 }
                                 InetSocketAddress brokerAddress =
                                         
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
-                                return 
pulsarClient.getConnection(brokerAddress, brokerAddress);
+                                return 
pulsarClient.getConnection(brokerAddress, brokerAddress,
+                                        randomKeyForSelectConnection);
                             } else {
                                 // Bundle is unloading, lookup topic
                                 return getClientCnxWithLookup(topic);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 018f3df6547..3c41161c3cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -62,8 +63,11 @@ public class AbstractReplicatorTest {
         final PulsarService pulsar = mock(PulsarService.class);
         final BrokerService broker = mock(BrokerService.class);
         final Topic localTopic = mock(Topic.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
         final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+        when(localClient.getCnxPool()).thenReturn(connectionPool);
         final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+        when(remoteClient.getCnxPool()).thenReturn(connectionPool);
         final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
         final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
         when(broker.executor()).thenReturn(eventLoopGroup);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 66460778dc2..430fd3bd1bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
@@ -42,6 +44,7 @@ import 
org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import 
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.TopicName;
@@ -151,11 +154,15 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
     @Test
     public void testTransactionBufferClientTimeout() throws Exception {
         PulsarService pulsarService = pulsarServiceList.get(0);
-        PulsarClient mockClient = mock(PulsarClientImpl.class);
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
         CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
         
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
+        when(((PulsarClientImpl)mockClient).getConnection(anyString(), 
anyInt())).thenReturn(completableFuture);
+        when(((PulsarClientImpl)mockClient).getConnection(any(), any(), 
anyInt())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
@@ -182,7 +189,9 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
         ConcurrentSkipListMap<Long, Object> outstandingRequests =
                 (ConcurrentSkipListMap<Long, Object>) 
field.get(transactionBufferHandler);
 
-        assertEquals(outstandingRequests.size(), 1);
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(outstandingRequests.size(), 1);
+        });
 
         Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> {
             if (outstandingRequests.size() == 0) {
@@ -202,11 +211,13 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
     @Test
     public void testTransactionBufferChannelUnActive() throws 
PulsarServerException {
         PulsarService pulsarService = pulsarServiceList.get(0);
-        PulsarClient mockClient = mock(PulsarClientImpl.class);
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
         CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
-        
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
+        when(((PulsarClientImpl)mockClient).getConnection(anyString(), 
anyInt())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index af4e442f617..c00505c3ced 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -24,6 +24,7 @@ import 
org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -31,8 +32,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -46,7 +47,9 @@ public class TransactionBufferHandlerImplTest {
 
     @Test
     public void testRequestCredits() throws PulsarServerException {
-        PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         PulsarService pulsarService = mock(PulsarService.class);
         NamespaceService namespaceService = mock(NamespaceService.class);
         when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
@@ -54,7 +57,10 @@ public class TransactionBufferHandlerImplTest {
         
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
         Optional<NamespaceEphemeralData> opData = Optional.empty();
         
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
-        
when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+        when(((PulsarClientImpl)pulsarClient).getConnection(anyString(), 
anyInt()))
+                
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+        when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
+                
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
         TransactionBufferHandlerImpl handler = spy(new 
TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
         doNothing().when(handler).endTxn(any());
         
doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString());
@@ -75,7 +81,9 @@ public class TransactionBufferHandlerImplTest {
 
     @Test
     public void testMinRequestCredits() throws PulsarServerException {
-        PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         PulsarService pulsarService = mock(PulsarService.class);
         when(pulsarService.getClient()).thenReturn(pulsarClient);
         TransactionBufferHandlerImpl handler = spy(new 
TransactionBufferHandlerImpl(pulsarService, null, 50, 3000));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index f11ab69b32f..419007432d0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -32,9 +32,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import io.netty.util.concurrent.Promise;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -81,6 +85,36 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
         eventLoop.shutdownGracefully();
     }
 
+    @Test
+    public void testSelectConnectionForSameProducer() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://sample/standalone/ns/tp_");
+        admin.topics().createNonPartitionedTopic(topicName);
+        final CommandCloseProducer commandCloseProducer = new 
CommandCloseProducer();
+        // 10 connection per broker.
+        final PulsarClient clientWith10ConPerBroker = 
PulsarClient.builder().connectionsPerBroker(10)
+                        .serviceUrl(lookupUrl.toString()).build();
+        ProducerImpl producer = (ProducerImpl) 
clientWith10ConPerBroker.newProducer().topic(topicName).create();
+        commandCloseProducer.setProducerId(producer.producerId);
+        // An error will be reported when the Producer reconnects using a 
different connection.
+        // If no error is reported, the same connection was used when 
reconnecting.
+        for (int i = 0; i < 20; i++) {
+            // Trigger reconnect
+            ClientCnx cnx = producer.getClientCnx();
+            if (cnx != null) {
+                cnx.handleCloseProducer(commandCloseProducer);
+                Awaitility.await().untilAsserted(() ->
+                        Assert.assertEquals(producer.getState().toString(), 
HandlerState.State.Ready.toString(),
+                                "The producer uses a different connection when 
reconnecting")
+                );
+            }
+        }
+
+        // cleanup.
+        producer.close();
+        clientWith10ConPerBroker.close();
+        admin.topics().delete(topicName);
+    }
+
     @Test
     public void testDoubleIpAddress() throws Exception {
         ClientConfigurationData conf = new ClientConfigurationData();
@@ -201,14 +235,16 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
 
         ClientCnx cnx = pool.getConnection(
                 InetSocketAddress.createUnresolved("proxy", 9999),
-                InetSocketAddress.createUnresolved("proxy", 9999)).get();
+                InetSocketAddress.createUnresolved("proxy", 9999),
+                pool.genRandomKeyToSelectCon()).get();
         Assert.assertEquals(cnx.remoteHostName, "proxy");
         Assert.assertNull(cnx.proxyToTargetBrokerAddress);
         cnx.close();
 
         cnx = pool.getConnection(
                 InetSocketAddress.createUnresolved("broker", 9999),
-                InetSocketAddress.createUnresolved("proxy", 9999)).get();
+                InetSocketAddress.createUnresolved("proxy", 9999),
+                pool.genRandomKeyToSelectCon()).get();
         Assert.assertEquals(cnx.remoteHostName, "proxy");
         Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999");
         cnx.close();
@@ -216,7 +252,8 @@ public class ConnectionPoolTest extends 
MockedPulsarServiceBaseTest {
 
         cnx = pool.getConnection(
                 InetSocketAddress.createUnresolved("broker", 9999),
-                InetSocketAddress.createUnresolved("broker", 9999)).get();
+                InetSocketAddress.createUnresolved("broker", 9999),
+                pool.genRandomKeyToSelectCon()).get();
         Assert.assertEquals(cnx.remoteHostName, "broker");
         Assert.assertNull(cnx.proxyToTargetBrokerAddress);
         cnx.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index 2177d1b2db5..324539e018d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -122,7 +122,7 @@ public class PulsarTestClient extends PulsarClientImpl {
             result.completeExceptionally(new IOException("New connections are 
rejected."));
             return result;
         } else {
-            return super.getConnection(topic);
+            return super.getConnection(topic, 
getCnxPool().genRandomKeyToSelectCon());
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 304aae3f57f..e3a788f36c1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.compaction;
 
 import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -47,6 +49,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -244,7 +247,10 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
     public void testPhaseOneLoopTimeConfiguration() {
         ServiceConfiguration configuration = new ServiceConfiguration();
         configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
-        TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, 
Mockito.mock(PulsarClientImpl.class),
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
+        TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, 
mockClient,
                 Mockito.mock(BookKeeper.class), compactionScheduler);
         Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 
60);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 32a1b0d8240..251cafa5406 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -42,6 +42,7 @@ public class ConnectionHandler {
     private volatile long epoch = -1L;
     protected volatile long lastConnectionClosedTimestamp = 0L;
     private final AtomicBoolean duringConnect = new AtomicBoolean(false);
+    protected final int randomKeyForSelectConnection;
 
     interface Connection {
 
@@ -57,6 +58,7 @@ public class ConnectionHandler {
 
     protected ConnectionHandler(HandlerState state, Backoff backoff, 
Connection connection) {
         this.state = state;
+        this.randomKeyForSelectConnection = 
state.client.getCnxPool().genRandomKeyToSelectCon();
         this.connection = connection;
         this.backoff = backoff;
         CLIENT_CNX_UPDATER.set(this, null);
@@ -83,7 +85,7 @@ public class ConnectionHandler {
         }
 
         try {
-            state.client.getConnection(state.topic) //
+            state.client.getConnection(state.topic, 
randomKeyForSelectConnection) //
                     .thenCompose(cnx -> connection.connectionOpened(cnx)) //
                     .thenAccept(__ -> duringConnect.set(false))
                     .exceptionally(this::handleConnectionError);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 659b54fe747..675d2773c67 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -127,8 +127,18 @@ public class ConnectionPool implements AutoCloseable {
 
     private static final Random random = new Random();
 
+    public int genRandomKeyToSelectCon() {
+        if (maxConnectionsPerHosts == 0) {
+            return -1;
+        }
+        return signSafeMod(random.nextInt(), maxConnectionsPerHosts);
+    }
+
     public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress 
address) {
-        return getConnection(address, address);
+        if (maxConnectionsPerHosts == 0) {
+            return getConnection(address, address, -1);
+        }
+        return getConnection(address, address, signSafeMod(random.nextInt(), 
maxConnectionsPerHosts));
     }
 
     void closeAllConnections() {
@@ -166,14 +176,12 @@ public class ConnectionPool implements AutoCloseable {
      * @return a future that will produce the ClientCnx object
      */
     public CompletableFuture<ClientCnx> getConnection(InetSocketAddress 
logicalAddress,
-            InetSocketAddress physicalAddress) {
+            InetSocketAddress physicalAddress, final int randomKey) {
         if (maxConnectionsPerHosts == 0) {
             // Disable pooling
             return createConnection(logicalAddress, physicalAddress, -1);
         }
 
-        final int randomKey = signSafeMod(random.nextInt(), 
maxConnectionsPerHosts);
-
         final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
                 pool.computeIfAbsent(logicalAddress, a -> new 
ConcurrentHashMap<>());
         CompletableFuture<ClientCnx> completableFuture = innerPool
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index a2e63c2cdc6..4c1a301fe3a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -898,15 +898,26 @@ public class PulsarClientImpl implements PulsarClient {
         conf.setTlsTrustStorePassword(tlsTrustStorePassword);
     }
 
+    public CompletableFuture<ClientCnx> getConnection(final String topic, int 
randomKeyForSelectConnection) {
+        TopicName topicName = TopicName.get(topic);
+        return lookup.getBroker(topicName)
+                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight(), randomKeyForSelectConnection));
+    }
+
+    /**
+     * Only for test.
+     */
+    @VisibleForTesting
     public CompletableFuture<ClientCnx> getConnection(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return lookup.getBroker(topicName)
-                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight()));
+                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
     }
 
     public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress 
logicalAddress,
-                                                      final InetSocketAddress 
physicalAddress) {
-        return cnxPool.getConnection(logicalAddress, physicalAddress);
+                                                      final InetSocketAddress 
physicalAddress,
+                                                      final int 
randomKeyForSelectConnection) {
+        return cnxPool.getConnection(logicalAddress, physicalAddress, 
randomKeyForSelectConnection);
     }
 
     /** visible for pulsar-functions. **/
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index d577f48357c..81e51424230 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -64,6 +64,8 @@ public class AcknowledgementsGroupingTrackerTest {
                 ConcurrentOpenHashMap.<MessageIdImpl, 
MessageIdImpl[]>newBuilder().build();
         cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new 
NioEventLoopGroup()));
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         doReturn(client).when(consumer).getClient();
         doReturn(cnx).when(consumer).getClientCnx();
         doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index e42d8edf34b..62d5309ef5c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,8 +36,7 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertTrue;
 
 @Test(groups = "broker-impl")
 @Slf4j
@@ -121,6 +122,8 @@ public class AutoClusterFailoverTest {
 
         AutoClusterFailover autoClusterFailover = 
Mockito.spy((AutoClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
         
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -130,14 +133,14 @@ public class AutoClusterFailoverTest {
         for (int i = 0; i < 2; i++) {
             Awaitility.await().untilAsserted(() ->
                     Assert.assertEquals(secondary, 
autoClusterFailover.getServiceUrl()));
-            assertEquals(-1, autoClusterFailover.getFailedTimestamp());
+            assertTrue(-1L == autoClusterFailover.getFailedTimestamp());
 
             // primary cluster came back
             
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(primary);
             Awaitility.await().untilAsserted(() ->
                     Assert.assertEquals(primary, 
autoClusterFailover.getServiceUrl()));
-            assertEquals(-1, autoClusterFailover.getRecoverTimestamp());
-            assertEquals(-1, autoClusterFailover.getFailedTimestamp());
+            assertTrue(-1L == autoClusterFailover.getRecoverTimestamp());
+            assertTrue(-1L == autoClusterFailover.getFailedTimestamp());
 
             
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         }
@@ -163,6 +166,8 @@ public class AutoClusterFailoverTest {
 
         AutoClusterFailover autoClusterFailover = 
Mockito.spy((AutoClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
         
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -219,6 +224,8 @@ public class AutoClusterFailoverTest {
 
         AutoClusterFailover autoClusterFailover = 
Mockito.spy((AutoClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
         
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -274,6 +281,8 @@ public class AutoClusterFailoverTest {
 
         AutoClusterFailover autoClusterFailover = 
Mockito.spy((AutoClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
         
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
         
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 7241819cfa0..dc5273ba54f 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -79,6 +79,8 @@ public class BatchMessageContainerImplTest {
         final ProducerConfigurationData producerConfigurationData = new 
ProducerConfigurationData();
         producerConfigurationData.setCompressionType(CompressionType.NONE);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         MemoryLimitController memoryLimitController = 
mock(MemoryLimitController.class);
         
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
         try {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index ab53c6759b2..a2c76a8956e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -72,6 +72,8 @@ public class ConsumerBuilderImplTest {
     @BeforeMethod(alwaysRun = true)
     public void setup() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         ConsumerConfigurationData consumerConfigurationData = 
mock(ConsumerConfigurationData.class);
         
when(consumerConfigurationData.getTopicsPattern()).thenReturn(Pattern.compile("\\w+"));
         
when(consumerConfigurationData.getSubscriptionName()).thenReturn("testSubscriptionName");
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index fbd239e1137..974c31cc82e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -31,6 +31,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @Test(groups = "broker-impl")
 public class ControlledClusterFailoverTest {
@@ -88,6 +89,8 @@ public class ControlledClusterFailoverTest {
 
         ControlledClusterFailover controlledClusterFailover = 
Mockito.spy((ControlledClusterFailover) provider);
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
 
         controlledClusterFailover.initialize(pulsarClient);
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index aceea081070..9cdcc8c031e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -70,6 +70,8 @@ public class PartitionedProducerImplTest {
     @BeforeMethod(alwaysRun = true)
     public void setup() {
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         schema = mock(Schema.class);
         producerInterceptors = mock(ProducerInterceptors.class);
         producerCreatedFuture = new CompletableFuture<>();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index c5ef1a9a9fd..9d6ec1f8344 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -53,6 +53,8 @@ public class ProducerBuilderImplTest {
     public void setup() {
         Producer producer = mock(Producer.class);
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
         when(client.newProducer()).thenReturn(producerBuilderImpl);
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index f6e7f284ce6..964a9b5558b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -40,6 +40,8 @@ public class ProducerStatsRecorderImplTest {
         ClientConfigurationData conf = new ClientConfigurationData();
         conf.setStatsIntervalSeconds(1);
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.getConfiguration()).thenReturn(conf);
         Timer timer = new HashedWheelTimer();
         when(client.timer()).thenReturn(timer);
@@ -60,6 +62,8 @@ public class ProducerStatsRecorderImplTest {
         ClientConfigurationData conf = new ClientConfigurationData();
         conf.setStatsIntervalSeconds(60);
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.getConfiguration()).thenReturn(conf);
         Timer timer = new HashedWheelTimer();
         when(client.timer()).thenReturn(timer);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index a506066df8b..c519941cf13 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -131,7 +132,7 @@ public class PulsarClientImplTest {
         when(cnx.ctx()).thenReturn(ctx);
         when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong()))
                 
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
-        when(pool.getConnection(any(InetSocketAddress.class), 
any(InetSocketAddress.class)))
+        when(pool.getConnection(any(InetSocketAddress.class), 
any(InetSocketAddress.class), anyInt()))
                 .thenReturn(CompletableFuture.completedFuture(cnx));
         Whitebox.setInternalState(clientImpl, "cnxPool", pool);
         Whitebox.setInternalState(clientImpl, "lookup", lookup);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
index 323333afc74..2e2cf29b2ca 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
@@ -42,6 +42,8 @@ public class UnAckedMessageTrackerTest  {
     @Test
     public void testAddAndRemove() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
                 1, TimeUnit.MILLISECONDS);
         when(client.timer()).thenReturn(timer);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index c81ff146004..1d739524f71 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -27,6 +27,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -46,6 +47,8 @@ public class MultiVersionSchemaInfoProviderTest {
     @BeforeMethod
     public void setup() {
         PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.getLookup()).thenReturn(mock(LookupService.class));
         schemaProvider = new MultiVersionSchemaInfoProvider(
                 TopicName.get("persistent://public/default/my-topic"), client);
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 49bc4161c30..d0c189b5fbd 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.ProducerBase;
@@ -93,6 +94,8 @@ public class ContextImplTest {
 
         producer = mock(Producer.class);
         client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.getCnxPool()).thenReturn(connectionPool);
         when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, 
Schema.BYTES));
         when(client.createProducerAsync(any(ProducerConfigurationData.class), 
any(), any()))
                 .thenReturn(CompletableFuture.completedFuture(producer));
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 5e52c89f5fc..936c67b37c0 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
@@ -59,6 +60,7 @@ import 
org.apache.pulsar.client.api.schema.GenericRecordBuilder;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -98,6 +100,8 @@ public class PulsarSinkTest {
      */
     private static PulsarClientImpl getPulsarClient() throws 
PulsarClientException {
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
         doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
         
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 4ac7d53cb7a..745f0ee7ee3 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.functions.source;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertSame;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
@@ -45,6 +48,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.ConsumerConfig;
@@ -54,7 +58,6 @@ import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.io.core.SourceContext;
 import org.testng.Assert;
 import org.mockito.ArgumentMatcher;
-import static org.testng.Assert.assertSame;
 
 import org.mockito.Mockito;
 import org.testng.annotations.DataProvider;
@@ -108,6 +111,8 @@ public class PulsarSourceTest {
      */
     private static PulsarClientImpl getPulsarClient() throws 
PulsarClientException {
         PulsarClientImpl pulsarClient = Mockito.mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
         ConsumerBuilder<?> goodConsumerBuilder = 
Mockito.mock(ConsumerBuilder.class);
         ConsumerBuilder<?> badConsumerBuilder = 
Mockito.mock(ConsumerBuilder.class);
         
Mockito.doReturn(goodConsumerBuilder).when(goodConsumerBuilder).topics(Mockito.argThat(new
 TopicMatcher("persistent://sample/ns1/test_result")));
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
index 57f2dca4922..e36825f1a7b 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -78,7 +79,8 @@ public class LeaderServiceTest {
     @BeforeMethod
     public void setup() throws PulsarClientException {
         mockClient = mock(PulsarClientImpl.class);
-
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
         mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 84832e74df8..66db2334389 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.WorkerInfo;
@@ -70,7 +71,8 @@ public class MembershipManagerTest {
 
     private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
-
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(mockClient.getCnxPool()).thenReturn(connectionPool);
         ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
         ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
 

Reply via email to