This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 6a605c8a127 [fix] [client] fix same producer/consumer use more than
one connection per broker (#21144)
6a605c8a127 is described below
commit 6a605c8a127462257eec7e8bf21ad7f0c7b90bd9
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 11 21:06:05 2023 +0800
[fix] [client] fix same producer/consumer use more than one connection per
broker (#21144)
Motivation: Pulsar has two mechanisms to guarantee that a producer connects
to the broker multiple times the result is still correct.
- In a connection, the second connection waits for the first connection to
complete.
- In a topic, the second connection will override the previous one.
However, if a producer can use different connections to connect to the
broker, these two mechanisms will not work.
When the config `connectionsPerBroker` of `PulsarClient` is larger than
`1`, a producer could use more than one connection, leading to the error above.
You can reproduce this issue by the test `testSelectConnectionForSameProducer.`
Modifications: Make the same producer/consumer usage the same connection
---
.../buffer/impl/TransactionBufferHandlerImpl.java | 8 +++-
.../broker/service/AbstractReplicatorTest.java | 4 ++
.../pulsar/broker/service/PersistentTopicTest.java | 3 ++
.../testcontext/NonStartableTestPulsarService.java | 7 +++-
.../buffer/TransactionBufferClientTest.java | 22 +++++++++--
.../buffer/TransactionBufferHandlerImplTest.java | 16 ++++++--
.../pulsar/client/impl/ConnectionPoolTest.java | 43 ++++++++++++++++++++--
.../pulsar/client/impl/PulsarTestClient.java | 2 +-
.../apache/pulsar/compaction/CompactorTest.java | 8 +++-
.../pulsar/client/impl/ConnectionHandler.java | 6 ++-
.../apache/pulsar/client/impl/ConnectionPool.java | 16 ++++++--
.../pulsar/client/impl/PulsarClientImpl.java | 19 ++++++++--
.../impl/AcknowledgementsGroupingTrackerTest.java | 2 +
.../client/impl/AutoClusterFailoverTest.java | 9 +++++
.../client/impl/BatchMessageContainerImplTest.java | 4 ++
.../pulsar/client/impl/ClientTestFixtures.java | 11 +++++-
.../client/impl/ConsumerBuilderImplTest.java | 4 ++
.../client/impl/ControlledClusterFailoverTest.java | 3 ++
.../client/impl/PartitionedProducerImplTest.java | 2 +
.../client/impl/ProducerBuilderImplTest.java | 2 +
.../client/impl/ProducerStatsRecorderImplTest.java | 4 ++
.../pulsar/client/impl/PulsarClientImplTest.java | 3 +-
.../client/impl/TableViewBuilderImplTest.java | 2 +
.../pulsar/client/impl/TableViewImplTest.java | 2 +
.../pulsar/client/impl/TopicListWatcherTest.java | 10 ++++-
.../client/impl/UnAckedMessageTrackerTest.java | 4 ++
.../MultiVersionSchemaInfoProviderTest.java | 3 ++
.../pulsar/functions/instance/ContextImplTest.java | 3 ++
.../pulsar/functions/sink/PulsarSinkTest.java | 4 ++
.../pulsar/functions/source/PulsarSourceTest.java | 5 +++
.../pulsar/functions/worker/LeaderServiceTest.java | 4 +-
.../functions/worker/MembershipManagerTest.java | 4 +-
32 files changed, 207 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 48dcf259edb..625d27329d3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -61,6 +61,8 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
private final PulsarService pulsarService;
private final PulsarClientImpl pulsarClient;
+ private final int randomKeyForSelectConnection;
+
private static final
AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER
=
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class,
"requestCredits");
private volatile int requestCredits;
@@ -74,6 +76,7 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
this.operationTimeoutInMills = operationTimeoutInMills;
this.timer = timer;
this.requestCredits = Math.max(100, maxConcurrentRequests);
+ this.randomKeyForSelectConnection =
pulsarClient.getCnxPool().genRandomKeyToSelectCon();
}
@Override
@@ -296,7 +299,7 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
}
public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
- return pulsarClient.getConnection(topic);
+ return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
}
public CompletableFuture<ClientCnx> getClientCnx(String topic) {
@@ -317,7 +320,8 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
}
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
- return
pulsarClient.getConnection(brokerAddress, brokerAddress);
+ return
pulsarClient.getConnection(brokerAddress, brokerAddress,
+ randomKeyForSelectConnection);
} else {
// Bundle is unloading, lookup topic
return getClientCnxWithLookup(topic);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 294a9b341ec..f8034c37971 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
@@ -62,8 +63,11 @@ public class AbstractReplicatorTest {
final PulsarService pulsar = mock(PulsarService.class);
final BrokerService broker = mock(BrokerService.class);
final Topic localTopic = mock(Topic.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+ when(localClient.getCnxPool()).thenReturn(connectionPool);
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+ when(remoteClient.getCnxPool()).thenReturn(connectionPool);
final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
when(broker.executor()).thenReturn(eventLoopGroup);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 078208f7e44..71310fef810 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -105,6 +105,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -1713,6 +1714,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
PulsarClientImpl pulsarClientMock = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClientMock.getCnxPool()).thenReturn(connectionPool);
when(pulsarClientMock.newProducer(any())).thenAnswer(
invocation -> {
ProducerBuilderImpl producerBuilder =
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 2896f338e4a..2027bb33bf1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.testcontext;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Collections;
@@ -42,6 +43,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
@@ -77,7 +79,10 @@ class NonStartableTestPulsarService extends
AbstractTestPulsarService {
throw new RuntimeException(e);
}
setSchemaRegistryService(spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class));
- setClient(mock(PulsarClientImpl.class));
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
+ setClient(mockClient);
this.namespaceService = mock(NamespaceService.class);
try {
startNamespaceService();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 2cfc9f46f0e..3873d9d37b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -51,6 +53,7 @@ import
org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -253,14 +256,21 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
assertEquals(pending.size(), 1);
}
+ /**
+ * This is a flaky test.
+ */
@Test
public void testTransactionBufferClientTimeout() throws Exception {
PulsarService pulsarService = pulsarServiceList.get(0);
- PulsarClient mockClient = mock(PulsarClientImpl.class);
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
CompletableFuture<ClientCnx> completableFuture = new
CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
+ when(((PulsarClientImpl)mockClient).getConnection(anyString(),
anyInt())).thenReturn(completableFuture);
+ when(((PulsarClientImpl)mockClient).getConnection(any(), any(),
anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
@@ -287,7 +297,9 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
ConcurrentSkipListMap<Long, Object> outstandingRequests =
(ConcurrentSkipListMap<Long, Object>)
field.get(transactionBufferHandler);
- assertEquals(outstandingRequests.size(), 1);
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(outstandingRequests.size(), 1);
+ });
Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> {
if (outstandingRequests.size() == 0) {
@@ -307,11 +319,13 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
@Test
public void testTransactionBufferChannelUnActive() throws
PulsarServerException {
PulsarService pulsarService = pulsarServiceList.get(0);
- PulsarClient mockClient = mock(PulsarClientImpl.class);
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
CompletableFuture<ClientCnx> completableFuture = new
CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
-
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
+ when(((PulsarClientImpl)mockClient).getConnection(anyString(),
anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index d6ec092c445..278cdbac1f0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -24,6 +24,7 @@ import
org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -31,8 +32,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -46,7 +47,9 @@ public class TransactionBufferHandlerImplTest {
@Test
public void testRequestCredits() throws PulsarServerException {
- PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+ PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
PulsarService pulsarService = mock(PulsarService.class);
NamespaceService namespaceService = mock(NamespaceService.class);
when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
@@ -54,7 +57,10 @@ public class TransactionBufferHandlerImplTest {
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
Optional<NamespaceEphemeralData> opData = Optional.empty();
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
-
when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+ when(((PulsarClientImpl)pulsarClient).getConnection(anyString(),
anyInt()))
+
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+ when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
+
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
TransactionBufferHandlerImpl handler = spy(new
TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString());
@@ -75,7 +81,9 @@ public class TransactionBufferHandlerImplTest {
@Test
public void testMinRequestCredits() throws PulsarServerException {
- PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
PulsarService pulsarService = mock(PulsarService.class);
when(pulsarService.getClient()).thenReturn(pulsarClient);
TransactionBufferHandlerImpl handler = spy(new
TransactionBufferHandlerImpl(pulsarService, null, 50, 3000));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index fe0aa4dd495..79ffada4a90 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -31,9 +31,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import io.netty.util.concurrent.Promise;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -80,6 +84,36 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
eventLoop.shutdownGracefully();
}
+ @Test
+ public void testSelectConnectionForSameProducer() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://sample/standalone/ns/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+ final CommandCloseProducer commandCloseProducer = new
CommandCloseProducer();
+ // 10 connection per broker.
+ final PulsarClient clientWith10ConPerBroker =
PulsarClient.builder().connectionsPerBroker(10)
+ .serviceUrl(lookupUrl.toString()).build();
+ ProducerImpl producer = (ProducerImpl)
clientWith10ConPerBroker.newProducer().topic(topicName).create();
+ commandCloseProducer.setProducerId(producer.producerId);
+ // An error will be reported when the Producer reconnects using a
different connection.
+ // If no error is reported, the same connection was used when
reconnecting.
+ for (int i = 0; i < 20; i++) {
+ // Trigger reconnect
+ ClientCnx cnx = producer.getClientCnx();
+ if (cnx != null) {
+ cnx.handleCloseProducer(commandCloseProducer);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(producer.getState().toString(),
HandlerState.State.Ready.toString(),
+ "The producer uses a different connection when
reconnecting")
+ );
+ }
+ }
+
+ // cleanup.
+ producer.close();
+ clientWith10ConPerBroker.close();
+ admin.topics().delete(topicName);
+ }
+
@Test
public void testDoubleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
@@ -205,20 +239,23 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
ClientCnx cnx = pool.getConnection(
InetSocketAddress.createUnresolved("proxy", 9999),
- InetSocketAddress.createUnresolved("proxy", 9999)).get();
+ InetSocketAddress.createUnresolved("proxy", 9999),
+ pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);
cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker1", 9999),
- InetSocketAddress.createUnresolved("proxy", 9999)).get();
+ InetSocketAddress.createUnresolved("proxy", 9999),
+ pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker1:9999");
cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker2", 9999),
- InetSocketAddress.createUnresolved("broker2", 9999)).get();
+ InetSocketAddress.createUnresolved("broker2", 9999),
+ pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "broker2");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index a725562ac40..6555c152bac 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -122,7 +122,7 @@ public class PulsarTestClient extends PulsarClientImpl {
result.completeExceptionally(new IOException("New connections are
rejected."));
return result;
} else {
- return super.getConnection(topic);
+ return super.getConnection(topic,
getCnxPool().genRandomKeyToSelectCon());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index d97d22ae7f1..4e442ac0513 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.compaction;
import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -258,7 +261,10 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
public void testPhaseOneLoopTimeConfiguration() {
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
- TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration,
Mockito.mock(PulsarClientImpl.class),
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
+ TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration,
mockClient,
Mockito.mock(BookKeeper.class), compactionScheduler);
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(),
60);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 263507dac1d..fc7c89c3ce6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -43,6 +43,7 @@ public class ConnectionHandler {
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
private final AtomicBoolean duringConnect = new AtomicBoolean(false);
+ protected final int randomKeyForSelectConnection;
interface Connection {
@@ -58,6 +59,7 @@ public class ConnectionHandler {
protected ConnectionHandler(HandlerState state, Backoff backoff,
Connection connection) {
this.state = state;
+ this.randomKeyForSelectConnection =
state.client.getCnxPool().genRandomKeyToSelectCon();
this.connection = connection;
this.backoff = backoff;
CLIENT_CNX_UPDATER.set(this, null);
@@ -88,11 +90,11 @@ public class ConnectionHandler {
if (state.redirectedClusterURI != null) {
InetSocketAddress address =
InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
state.redirectedClusterURI.getPort());
- cnxFuture = state.client.getConnection(address, address);
+ cnxFuture = state.client.getConnection(address, address,
randomKeyForSelectConnection);
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
- cnxFuture = state.client.getConnection(state.topic); //
+ cnxFuture = state.client.getConnection(state.topic,
randomKeyForSelectConnection);
}
cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
.thenAccept(__ -> duringConnect.set(false))
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 1420d81c688..ef3a9249c82 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -165,8 +165,18 @@ public class ConnectionPool implements AutoCloseable {
private static final Random random = new Random();
+ public int genRandomKeyToSelectCon() {
+ if (maxConnectionsPerHosts == 0) {
+ return -1;
+ }
+ return signSafeMod(random.nextInt(), maxConnectionsPerHosts);
+ }
+
public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress
address) {
- return getConnection(address, address);
+ if (maxConnectionsPerHosts == 0) {
+ return getConnection(address, address, -1);
+ }
+ return getConnection(address, address, signSafeMod(random.nextInt(),
maxConnectionsPerHosts));
}
void closeAllConnections() {
@@ -204,14 +214,12 @@ public class ConnectionPool implements AutoCloseable {
* @return a future that will produce the ClientCnx object
*/
public CompletableFuture<ClientCnx> getConnection(InetSocketAddress
logicalAddress,
- InetSocketAddress physicalAddress) {
+ InetSocketAddress physicalAddress, final int randomKey) {
if (maxConnectionsPerHosts == 0) {
// Disable pooling
return createConnection(logicalAddress, physicalAddress, -1);
}
- final int randomKey = signSafeMod(random.nextInt(),
maxConnectionsPerHosts);
-
final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
pool.computeIfAbsent(logicalAddress, a -> new
ConcurrentHashMap<>());
CompletableFuture<ClientCnx> completableFuture = innerPool
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6c749a8cf43..fdabb5fa8cf 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -944,10 +944,20 @@ public class PulsarClientImpl implements PulsarClient {
conf.setTlsTrustStorePassword(tlsTrustStorePassword);
}
+ public CompletableFuture<ClientCnx> getConnection(final String topic, int
randomKeyForSelectConnection) {
+ TopicName topicName = TopicName.get(topic);
+ return lookup.getBroker(topicName)
+ .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight(), randomKeyForSelectConnection));
+ }
+
+ /**
+ * Only for test.
+ */
+ @VisibleForTesting
public CompletableFuture<ClientCnx> getConnection(final String topic) {
TopicName topicName = TopicName.get(topic);
return lookup.getBroker(topicName)
- .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight()));
+ .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
}
public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
@@ -956,12 +966,13 @@ public class PulsarClientImpl implements PulsarClient {
"Can't get client connection to HTTP service URL", null));
}
InetSocketAddress address = lookup.resolveHost();
- return getConnection(address, address);
+ return getConnection(address, address,
cnxPool.genRandomKeyToSelectCon());
}
public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress
logicalAddress,
- final InetSocketAddress
physicalAddress) {
- return cnxPool.getConnection(logicalAddress, physicalAddress);
+ final InetSocketAddress
physicalAddress,
+ final int
randomKeyForSelectConnection) {
+ return cnxPool.getConnection(logicalAddress, physicalAddress,
randomKeyForSelectConnection);
}
/** visible for pulsar-functions. **/
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 0418a54c772..1d1a6f85bfd 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -65,6 +65,8 @@ public class AcknowledgementsGroupingTrackerTest {
ConcurrentOpenHashMap.<MessageIdAdv,
MessageIdImpl[]>newBuilder().build();
cnx = spy(new ClientCnxTest(new ClientConfigurationData(),
eventLoopGroup));
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
doReturn(client).when(consumer).getClient();
doReturn(cnx).when(consumer).getClientCnx();
doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index 36ffa30296b..63fbb239439 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -121,6 +122,8 @@ public class AutoClusterFailoverTest {
AutoClusterFailover autoClusterFailover =
Mockito.spy((AutoClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -163,6 +166,8 @@ public class AutoClusterFailoverTest {
AutoClusterFailover autoClusterFailover =
Mockito.spy((AutoClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -217,6 +222,8 @@ public class AutoClusterFailoverTest {
AutoClusterFailover autoClusterFailover =
Mockito.spy((AutoClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -270,6 +277,8 @@ public class AutoClusterFailoverTest {
AutoClusterFailover autoClusterFailover =
Mockito.spy((AutoClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 4b80e19c256..abb195c9830 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -105,6 +105,8 @@ public class BatchMessageContainerImplTest {
final ProducerConfigurationData producerConfigurationData = new
ProducerConfigurationData();
producerConfigurationData.setCompressionType(CompressionType.NONE);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
MemoryLimitController memoryLimitController =
mock(MemoryLimitController.class);
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
try {
@@ -148,6 +150,8 @@ public class BatchMessageContainerImplTest {
final ProducerConfigurationData producerConfigurationData = new
ProducerConfigurationData();
producerConfigurationData.setCompressionType(CompressionType.NONE);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
MemoryLimitController memoryLimitController =
mock(MemoryLimitController.class);
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
try {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 4db5dbe8776..ff7d7f12dd4 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.client.impl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.channel.ChannelHandlerContext;
@@ -72,11 +74,16 @@ class ClientTestFixtures {
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
- when(clientMock.getConnection(any(),
any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+
when(clientMock.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+ when(clientMock.getConnection(anyString(), anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+ when(clientMock.getConnection(any(), any(), anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
ConnectionPool connectionPoolMock = mock(ConnectionPool.class);
when(clientMock.getCnxPool()).thenReturn(connectionPoolMock);
when(connectionPoolMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
- when(connectionPoolMock.getConnection(any(),
any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+ when(connectionPoolMock.getConnection(any(), any(), anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(clientCnxMock));
return clientMock;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 8dbd23f9c29..3fe13663046 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -76,6 +76,8 @@ public class ConsumerBuilderImplTest {
@BeforeMethod(alwaysRun = true)
public void setup() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
ConsumerConfigurationData consumerConfigurationData =
mock(ConsumerConfigurationData.class);
when(consumerConfigurationData.getTopicsPattern()).thenReturn(Pattern.compile("\\w+"));
when(consumerConfigurationData.getSubscriptionName()).thenReturn("testSubscriptionName");
@@ -104,6 +106,8 @@ public class ConsumerBuilderImplTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testConsumerBuilderImplWhenSchemaIsNull() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
ConsumerConfigurationData consumerConfigurationData =
mock(ConsumerConfigurationData.class);
new ConsumerBuilderImpl(client, consumerConfigurationData, null);
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index 570b1398328..227e0db10b7 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -31,6 +31,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
@Test(groups = "broker-impl")
public class ControlledClusterFailoverTest {
@@ -88,6 +89,8 @@ public class ControlledClusterFailoverTest {
ControlledClusterFailover controlledClusterFailover =
Mockito.spy((ControlledClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
controlledClusterFailover.initialize(pulsarClient);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index 223881d85a8..2bd18f69386 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -70,6 +70,8 @@ public class PartitionedProducerImplTest {
@BeforeMethod(alwaysRun = true)
public void setup() {
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
schema = mock(Schema.class);
producerInterceptors = mock(ProducerInterceptors.class);
producerCreatedFuture = new CompletableFuture<>();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index bb3e3fc3acc..b830d375303 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -52,6 +52,8 @@ public class ProducerBuilderImplTest {
public void setup() {
Producer<?> producer = mock(Producer.class);
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
when(client.newProducer()).thenReturn(producerBuilderImpl);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index 32d0eff6e79..27e2dcb37ce 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -40,6 +40,8 @@ public class ProducerStatsRecorderImplTest {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setStatsIntervalSeconds(1);
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.getConfiguration()).thenReturn(conf);
Timer timer = new HashedWheelTimer();
when(client.timer()).thenReturn(timer);
@@ -60,6 +62,8 @@ public class ProducerStatsRecorderImplTest {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setStatsIntervalSeconds(60);
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.getConfiguration()).thenReturn(conf);
Timer timer = new HashedWheelTimer();
when(client.timer()).thenReturn(timer);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 54d13538d78..e0b25db8912 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
@@ -122,7 +123,7 @@ public class PulsarClientImplTest {
when(cnx.ctx()).thenReturn(ctx);
when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong()))
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
- when(pool.getConnection(any(InetSocketAddress.class),
any(InetSocketAddress.class)))
+ when(pool.getConnection(any(InetSocketAddress.class),
any(InetSocketAddress.class), anyInt()))
.thenReturn(CompletableFuture.completedFuture(cnx));
ClientConfigurationData conf = new ClientConfigurationData();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
index 9959a203855..eee8ba4e8f4 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewBuilderImplTest.java
@@ -52,6 +52,8 @@ public class TableViewBuilderImplTest {
Reader reader = mock(Reader.class);
when(reader.readNextAsync()).thenReturn(CompletableFuture.allOf());
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.newReader(any(Schema.class)))
.thenReturn(new ReaderBuilderImpl(client, Schema.BYTES));
when(client.createReaderAsync(any(ReaderConfigurationData.class),
any(Schema.class)))
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java
index 68c886bc721..6a866034ddb 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java
@@ -38,6 +38,8 @@ public class TableViewImplTest {
@BeforeClass(alwaysRun = true)
public void setup() {
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.newReader(any(Schema.class)))
.thenReturn(new ReaderBuilderImpl(client, Schema.BYTES));
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
index e462bb4d62c..1b39448fbe7 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java
@@ -28,7 +28,9 @@ import
org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
import org.apache.pulsar.common.naming.NamespaceName;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -52,6 +54,9 @@ public class TopicListWatcherTest {
public void setup() {
listener = mock(TopicsChangedListener.class);
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
+ when(connectionPool.genRandomKeyToSelectCon()).thenReturn(0);
when(client.getConfiguration()).thenReturn(new
ClientConfigurationData());
clientCnxFuture = new CompletableFuture<>();
when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture);
@@ -59,6 +64,9 @@ public class TopicListWatcherTest {
when(client.timer()).thenReturn(timer);
String topic = "persistent://tenant/ns/topic\\d+";
when(client.getConnection(topic)).thenReturn(clientCnxFuture);
+ when(client.getConnection(topic, 0)).thenReturn(clientCnxFuture);
+ when(client.getConnection(any(), any(),
anyInt())).thenReturn(clientCnxFuture);
+ when(connectionPool.getConnection(any(), any(),
anyInt())).thenReturn(clientCnxFuture);
watcherFuture = new CompletableFuture<>();
watcher = new TopicListWatcher(listener, client,
Pattern.compile(topic), 7,
@@ -67,7 +75,7 @@ public class TopicListWatcherTest {
@Test
public void testWatcherGrabsConnection() {
- verify(client).getConnection(any());
+ verify(client).getConnection(anyString(), anyInt());
}
@Test
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
index f6c668703d9..91ad3210482 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
@@ -45,6 +45,8 @@ public class UnAckedMessageTrackerTest {
@Test
public void testAddAndRemove() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
Timer timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
1, TimeUnit.MILLISECONDS);
when(client.timer()).thenReturn(timer);
@@ -83,6 +85,8 @@ public class UnAckedMessageTrackerTest {
@Test
public void testTrackChunkedMessageId() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
Timer timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
1, TimeUnit.MILLISECONDS);
when(client.timer()).thenReturn(timer);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index 8959e670234..bfd6af37e3e 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -27,6 +27,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -46,6 +47,8 @@ public class MultiVersionSchemaInfoProviderTest {
@BeforeMethod
public void setup() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.getLookup()).thenReturn(mock(LookupService.class));
schemaProvider = new MultiVersionSchemaInfoProvider(
TopicName.get("persistent://public/default/my-topic"), client);
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index e0ebb52da74..90f7df37fa1 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBase;
@@ -99,6 +100,8 @@ public class ContextImplTest {
producer = mock(Producer.class);
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client,
Schema.BYTES));
when(client.createProducerAsync(any(ProducerConfigurationData.class),
any(), any()))
.thenReturn(CompletableFuture.completedFuture(producer));
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index fdac39512cc..799bad839a4 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
@@ -56,6 +57,7 @@ import
org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -95,6 +97,8 @@ public class PulsarSinkTest {
*/
private static PulsarClientImpl getPulsarClient() throws
PulsarClientException {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 91e4c06fe5b..5d6e4a3dc75 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.functions.source;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertSame;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
@@ -44,6 +46,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
@@ -105,6 +108,8 @@ public class PulsarSourceTest {
*/
private static PulsarClientImpl getPulsarClient() throws
PulsarClientException {
PulsarClientImpl pulsarClient = Mockito.mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
ConsumerBuilder<?> goodConsumerBuilder =
Mockito.mock(ConsumerBuilder.class);
ConsumerBuilder<?> badConsumerBuilder =
Mockito.mock(ConsumerBuilder.class);
Mockito.doReturn(goodConsumerBuilder).when(goodConsumerBuilder)
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
index 8da24fd1b72..5c10a59bd13 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -76,7 +77,8 @@ public class LeaderServiceTest {
@BeforeMethod
public void setup() throws PulsarClientException {
mockClient = mock(PulsarClientImpl.class);
-
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index e066bb24e6e..ac3176b3135 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.WorkerInfo;
@@ -68,7 +69,8 @@ public class MembershipManagerTest {
private static PulsarClient mockPulsarClient() throws
PulsarClientException {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
-
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);