This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 7b4f82b0222 [fix] [client] fix same producer/consumer use more than
one connection per broker (#21144)
7b4f82b0222 is described below
commit 7b4f82b02224d5758552da2d3860ed19b31ef811
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 11 21:06:05 2023 +0800
[fix] [client] fix same producer/consumer use more than one connection per
broker (#21144)
Motivation: Pulsar has two mechanisms to guarantee that a producer connects
to the broker multiple times the result is still correct.
- In a connection, the second connection waits for the first connection to
complete.
- In a topic, the second connection will override the previous one.
However, if a producer can use different connections to connect to the
broker, these two mechanisms will not work.
When the config `connectionsPerBroker` of `PulsarClient` is larger than
`1`, a producer could use more than one connection, leading to the error above.
You can reproduce this issue by the test `testSelectConnectionForSameProducer.`
Modifications: Make the same producer/consumer usage the same connection
(cherry picked from commit f2b9a3fffd4aad414ba5f35ebff17502cb64eb12)
---
.../buffer/impl/TransactionBufferHandlerImpl.java | 8 +++-
.../broker/service/AbstractReplicatorTest.java | 4 ++
.../buffer/TransactionBufferClientTest.java | 19 ++++++++--
.../buffer/TransactionBufferHandlerImplTest.java | 16 ++++++--
.../pulsar/client/impl/ConnectionPoolTest.java | 43 ++++++++++++++++++++--
.../pulsar/client/impl/PulsarTestClient.java | 2 +-
.../apache/pulsar/compaction/CompactorTest.java | 8 +++-
.../pulsar/client/impl/ConnectionHandler.java | 4 +-
.../apache/pulsar/client/impl/ConnectionPool.java | 16 ++++++--
.../pulsar/client/impl/PulsarClientImpl.java | 17 +++++++--
.../impl/AcknowledgementsGroupingTrackerTest.java | 2 +
.../client/impl/AutoClusterFailoverTest.java | 19 +++++++---
.../client/impl/BatchMessageContainerImplTest.java | 2 +
.../client/impl/ConsumerBuilderImplTest.java | 2 +
.../client/impl/ControlledClusterFailoverTest.java | 3 ++
.../client/impl/PartitionedProducerImplTest.java | 2 +
.../client/impl/ProducerBuilderImplTest.java | 2 +
.../client/impl/ProducerStatsRecorderImplTest.java | 4 ++
.../pulsar/client/impl/PulsarClientImplTest.java | 3 +-
.../client/impl/UnAckedMessageTrackerTest.java | 2 +
.../MultiVersionSchemaInfoProviderTest.java | 3 ++
.../pulsar/functions/instance/ContextImplTest.java | 3 ++
.../pulsar/functions/sink/PulsarSinkTest.java | 4 ++
.../pulsar/functions/source/PulsarSourceTest.java | 7 +++-
.../pulsar/functions/worker/LeaderServiceTest.java | 4 +-
.../functions/worker/MembershipManagerTest.java | 4 +-
26 files changed, 171 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 3f9a083787b..a0bfef3696a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -61,6 +61,8 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
private final PulsarService pulsarService;
private final PulsarClientImpl pulsarClient;
+ private final int randomKeyForSelectConnection;
+
private static final
AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER
=
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class,
"requestCredits");
private volatile int requestCredits;
@@ -74,6 +76,7 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
this.operationTimeoutInMills = operationTimeoutInMills;
this.timer = timer;
this.requestCredits = Math.max(100, maxConcurrentRequests);
+ this.randomKeyForSelectConnection =
pulsarClient.getCnxPool().genRandomKeyToSelectCon();
}
@Override
@@ -296,7 +299,7 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
}
public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
- return pulsarClient.getConnection(topic);
+ return pulsarClient.getConnection(topic, randomKeyForSelectConnection);
}
public CompletableFuture<ClientCnx> getClientCnx(String topic) {
@@ -317,7 +320,8 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
}
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
- return
pulsarClient.getConnection(brokerAddress, brokerAddress);
+ return
pulsarClient.getConnection(brokerAddress, brokerAddress,
+ randomKeyForSelectConnection);
} else {
// Bundle is unloading, lookup topic
return getClientCnxWithLookup(topic);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 018f3df6547..3c41161c3cd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
@@ -62,8 +63,11 @@ public class AbstractReplicatorTest {
final PulsarService pulsar = mock(PulsarService.class);
final BrokerService broker = mock(BrokerService.class);
final Topic localTopic = mock(Topic.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+ when(localClient.getCnxPool()).thenReturn(connectionPool);
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+ when(remoteClient.getCnxPool()).thenReturn(connectionPool);
final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
when(broker.executor()).thenReturn(eventLoopGroup);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 66460778dc2..430fd3bd1bb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
@@ -42,6 +44,7 @@ import
org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
@@ -151,11 +154,15 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
@Test
public void testTransactionBufferClientTimeout() throws Exception {
PulsarService pulsarService = pulsarServiceList.get(0);
- PulsarClient mockClient = mock(PulsarClientImpl.class);
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
CompletableFuture<ClientCnx> completableFuture = new
CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
+ when(((PulsarClientImpl)mockClient).getConnection(anyString(),
anyInt())).thenReturn(completableFuture);
+ when(((PulsarClientImpl)mockClient).getConnection(any(), any(),
anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
@@ -182,7 +189,9 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
ConcurrentSkipListMap<Long, Object> outstandingRequests =
(ConcurrentSkipListMap<Long, Object>)
field.get(transactionBufferHandler);
- assertEquals(outstandingRequests.size(), 1);
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(outstandingRequests.size(), 1);
+ });
Awaitility.await().atLeast(2, TimeUnit.SECONDS).until(() -> {
if (outstandingRequests.size() == 0) {
@@ -202,11 +211,13 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
@Test
public void testTransactionBufferChannelUnActive() throws
PulsarServerException {
PulsarService pulsarService = pulsarServiceList.get(0);
- PulsarClient mockClient = mock(PulsarClientImpl.class);
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
CompletableFuture<ClientCnx> completableFuture = new
CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
-
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
+ when(((PulsarClientImpl)mockClient).getConnection(anyString(),
anyInt())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index af4e442f617..c00505c3ced 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -24,6 +24,7 @@ import
org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -31,8 +32,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -46,7 +47,9 @@ public class TransactionBufferHandlerImplTest {
@Test
public void testRequestCredits() throws PulsarServerException {
- PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+ PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
PulsarService pulsarService = mock(PulsarService.class);
NamespaceService namespaceService = mock(NamespaceService.class);
when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
@@ -54,7 +57,10 @@ public class TransactionBufferHandlerImplTest {
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
Optional<NamespaceEphemeralData> opData = Optional.empty();
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
-
when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+ when(((PulsarClientImpl)pulsarClient).getConnection(anyString(),
anyInt()))
+
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+ when(((PulsarClientImpl)pulsarClient).getConnection(anyString()))
+
.thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
TransactionBufferHandlerImpl handler = spy(new
TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString());
@@ -75,7 +81,9 @@ public class TransactionBufferHandlerImplTest {
@Test
public void testMinRequestCredits() throws PulsarServerException {
- PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
PulsarService pulsarService = mock(PulsarService.class);
when(pulsarService.getClient()).thenReturn(pulsarClient);
TransactionBufferHandlerImpl handler = spy(new
TransactionBufferHandlerImpl(pulsarService, null, 50, 3000));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index f11ab69b32f..419007432d0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -32,9 +32,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import io.netty.util.concurrent.Promise;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -81,6 +85,36 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
eventLoop.shutdownGracefully();
}
+ @Test
+ public void testSelectConnectionForSameProducer() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://sample/standalone/ns/tp_");
+ admin.topics().createNonPartitionedTopic(topicName);
+ final CommandCloseProducer commandCloseProducer = new
CommandCloseProducer();
+ // 10 connection per broker.
+ final PulsarClient clientWith10ConPerBroker =
PulsarClient.builder().connectionsPerBroker(10)
+ .serviceUrl(lookupUrl.toString()).build();
+ ProducerImpl producer = (ProducerImpl)
clientWith10ConPerBroker.newProducer().topic(topicName).create();
+ commandCloseProducer.setProducerId(producer.producerId);
+ // An error will be reported when the Producer reconnects using a
different connection.
+ // If no error is reported, the same connection was used when
reconnecting.
+ for (int i = 0; i < 20; i++) {
+ // Trigger reconnect
+ ClientCnx cnx = producer.getClientCnx();
+ if (cnx != null) {
+ cnx.handleCloseProducer(commandCloseProducer);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(producer.getState().toString(),
HandlerState.State.Ready.toString(),
+ "The producer uses a different connection when
reconnecting")
+ );
+ }
+ }
+
+ // cleanup.
+ producer.close();
+ clientWith10ConPerBroker.close();
+ admin.topics().delete(topicName);
+ }
+
@Test
public void testDoubleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
@@ -201,14 +235,16 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
ClientCnx cnx = pool.getConnection(
InetSocketAddress.createUnresolved("proxy", 9999),
- InetSocketAddress.createUnresolved("proxy", 9999)).get();
+ InetSocketAddress.createUnresolved("proxy", 9999),
+ pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);
cnx.close();
cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker", 9999),
- InetSocketAddress.createUnresolved("proxy", 9999)).get();
+ InetSocketAddress.createUnresolved("proxy", 9999),
+ pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "proxy");
Assert.assertEquals(cnx.proxyToTargetBrokerAddress, "broker:9999");
cnx.close();
@@ -216,7 +252,8 @@ public class ConnectionPoolTest extends
MockedPulsarServiceBaseTest {
cnx = pool.getConnection(
InetSocketAddress.createUnresolved("broker", 9999),
- InetSocketAddress.createUnresolved("broker", 9999)).get();
+ InetSocketAddress.createUnresolved("broker", 9999),
+ pool.genRandomKeyToSelectCon()).get();
Assert.assertEquals(cnx.remoteHostName, "broker");
Assert.assertNull(cnx.proxyToTargetBrokerAddress);
cnx.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index 2177d1b2db5..324539e018d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -122,7 +122,7 @@ public class PulsarTestClient extends PulsarClientImpl {
result.completeExceptionally(new IOException("New connections are
rejected."));
return result;
} else {
- return super.getConnection(topic);
+ return super.getConnection(topic,
getCnxPool().genRandomKeyToSelectCon());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 304aae3f57f..e3a788f36c1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.compaction;
import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -47,6 +49,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -244,7 +247,10 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
public void testPhaseOneLoopTimeConfiguration() {
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
- TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration,
Mockito.mock(PulsarClientImpl.class),
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
+ TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration,
mockClient,
Mockito.mock(BookKeeper.class), compactionScheduler);
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(),
60);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 32a1b0d8240..251cafa5406 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -42,6 +42,7 @@ public class ConnectionHandler {
private volatile long epoch = -1L;
protected volatile long lastConnectionClosedTimestamp = 0L;
private final AtomicBoolean duringConnect = new AtomicBoolean(false);
+ protected final int randomKeyForSelectConnection;
interface Connection {
@@ -57,6 +58,7 @@ public class ConnectionHandler {
protected ConnectionHandler(HandlerState state, Backoff backoff,
Connection connection) {
this.state = state;
+ this.randomKeyForSelectConnection =
state.client.getCnxPool().genRandomKeyToSelectCon();
this.connection = connection;
this.backoff = backoff;
CLIENT_CNX_UPDATER.set(this, null);
@@ -83,7 +85,7 @@ public class ConnectionHandler {
}
try {
- state.client.getConnection(state.topic) //
+ state.client.getConnection(state.topic,
randomKeyForSelectConnection) //
.thenCompose(cnx -> connection.connectionOpened(cnx)) //
.thenAccept(__ -> duringConnect.set(false))
.exceptionally(this::handleConnectionError);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 659b54fe747..675d2773c67 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -127,8 +127,18 @@ public class ConnectionPool implements AutoCloseable {
private static final Random random = new Random();
+ public int genRandomKeyToSelectCon() {
+ if (maxConnectionsPerHosts == 0) {
+ return -1;
+ }
+ return signSafeMod(random.nextInt(), maxConnectionsPerHosts);
+ }
+
public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress
address) {
- return getConnection(address, address);
+ if (maxConnectionsPerHosts == 0) {
+ return getConnection(address, address, -1);
+ }
+ return getConnection(address, address, signSafeMod(random.nextInt(),
maxConnectionsPerHosts));
}
void closeAllConnections() {
@@ -166,14 +176,12 @@ public class ConnectionPool implements AutoCloseable {
* @return a future that will produce the ClientCnx object
*/
public CompletableFuture<ClientCnx> getConnection(InetSocketAddress
logicalAddress,
- InetSocketAddress physicalAddress) {
+ InetSocketAddress physicalAddress, final int randomKey) {
if (maxConnectionsPerHosts == 0) {
// Disable pooling
return createConnection(logicalAddress, physicalAddress, -1);
}
- final int randomKey = signSafeMod(random.nextInt(),
maxConnectionsPerHosts);
-
final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
pool.computeIfAbsent(logicalAddress, a -> new
ConcurrentHashMap<>());
CompletableFuture<ClientCnx> completableFuture = innerPool
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index a2e63c2cdc6..4c1a301fe3a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -898,15 +898,26 @@ public class PulsarClientImpl implements PulsarClient {
conf.setTlsTrustStorePassword(tlsTrustStorePassword);
}
+ public CompletableFuture<ClientCnx> getConnection(final String topic, int
randomKeyForSelectConnection) {
+ TopicName topicName = TopicName.get(topic);
+ return lookup.getBroker(topicName)
+ .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight(), randomKeyForSelectConnection));
+ }
+
+ /**
+ * Only for test.
+ */
+ @VisibleForTesting
public CompletableFuture<ClientCnx> getConnection(final String topic) {
TopicName topicName = TopicName.get(topic);
return lookup.getBroker(topicName)
- .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight()));
+ .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
}
public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress
logicalAddress,
- final InetSocketAddress
physicalAddress) {
- return cnxPool.getConnection(logicalAddress, physicalAddress);
+ final InetSocketAddress
physicalAddress,
+ final int
randomKeyForSelectConnection) {
+ return cnxPool.getConnection(logicalAddress, physicalAddress,
randomKeyForSelectConnection);
}
/** visible for pulsar-functions. **/
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index d577f48357c..81e51424230 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -64,6 +64,8 @@ public class AcknowledgementsGroupingTrackerTest {
ConcurrentOpenHashMap.<MessageIdImpl,
MessageIdImpl[]>newBuilder().build();
cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new
NioEventLoopGroup()));
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
doReturn(client).when(consumer).getClient();
doReturn(cnx).when(consumer).getClientCnx();
doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index e42d8edf34b..62d5309ef5c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -34,8 +36,7 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertTrue;
@Test(groups = "broker-impl")
@Slf4j
@@ -121,6 +122,8 @@ public class AutoClusterFailoverTest {
AutoClusterFailover autoClusterFailover =
Mockito.spy((AutoClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -130,14 +133,14 @@ public class AutoClusterFailoverTest {
for (int i = 0; i < 2; i++) {
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(secondary,
autoClusterFailover.getServiceUrl()));
- assertEquals(-1, autoClusterFailover.getFailedTimestamp());
+ assertTrue(-1L == autoClusterFailover.getFailedTimestamp());
// primary cluster came back
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(primary);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(primary,
autoClusterFailover.getServiceUrl()));
- assertEquals(-1, autoClusterFailover.getRecoverTimestamp());
- assertEquals(-1, autoClusterFailover.getFailedTimestamp());
+ assertTrue(-1L == autoClusterFailover.getRecoverTimestamp());
+ assertTrue(-1L == autoClusterFailover.getFailedTimestamp());
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
}
@@ -163,6 +166,8 @@ public class AutoClusterFailoverTest {
AutoClusterFailover autoClusterFailover =
Mockito.spy((AutoClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -219,6 +224,8 @@ public class AutoClusterFailoverTest {
AutoClusterFailover autoClusterFailover =
Mockito.spy((AutoClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
@@ -274,6 +281,8 @@ public class AutoClusterFailoverTest {
AutoClusterFailover autoClusterFailover =
Mockito.spy((AutoClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 7241819cfa0..dc5273ba54f 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -79,6 +79,8 @@ public class BatchMessageContainerImplTest {
final ProducerConfigurationData producerConfigurationData = new
ProducerConfigurationData();
producerConfigurationData.setCompressionType(CompressionType.NONE);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
MemoryLimitController memoryLimitController =
mock(MemoryLimitController.class);
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
try {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index ab53c6759b2..a2c76a8956e 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -72,6 +72,8 @@ public class ConsumerBuilderImplTest {
@BeforeMethod(alwaysRun = true)
public void setup() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
ConsumerConfigurationData consumerConfigurationData =
mock(ConsumerConfigurationData.class);
when(consumerConfigurationData.getTopicsPattern()).thenReturn(Pattern.compile("\\w+"));
when(consumerConfigurationData.getSubscriptionName()).thenReturn("testSubscriptionName");
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index fbd239e1137..974c31cc82e 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -31,6 +31,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
@Test(groups = "broker-impl")
public class ControlledClusterFailoverTest {
@@ -88,6 +89,8 @@ public class ControlledClusterFailoverTest {
ControlledClusterFailover controlledClusterFailover =
Mockito.spy((ControlledClusterFailover) provider);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
controlledClusterFailover.initialize(pulsarClient);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index aceea081070..9cdcc8c031e 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -70,6 +70,8 @@ public class PartitionedProducerImplTest {
@BeforeMethod(alwaysRun = true)
public void setup() {
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
schema = mock(Schema.class);
producerInterceptors = mock(ProducerInterceptors.class);
producerCreatedFuture = new CompletableFuture<>();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index c5ef1a9a9fd..9d6ec1f8344 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -53,6 +53,8 @@ public class ProducerBuilderImplTest {
public void setup() {
Producer producer = mock(Producer.class);
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
when(client.newProducer()).thenReturn(producerBuilderImpl);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index f6e7f284ce6..964a9b5558b 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -40,6 +40,8 @@ public class ProducerStatsRecorderImplTest {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setStatsIntervalSeconds(1);
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.getConfiguration()).thenReturn(conf);
Timer timer = new HashedWheelTimer();
when(client.timer()).thenReturn(timer);
@@ -60,6 +62,8 @@ public class ProducerStatsRecorderImplTest {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setStatsIntervalSeconds(60);
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.getConfiguration()).thenReturn(conf);
Timer timer = new HashedWheelTimer();
when(client.timer()).thenReturn(timer);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index a506066df8b..c519941cf13 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -131,7 +132,7 @@ public class PulsarClientImplTest {
when(cnx.ctx()).thenReturn(ctx);
when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong()))
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
- when(pool.getConnection(any(InetSocketAddress.class),
any(InetSocketAddress.class)))
+ when(pool.getConnection(any(InetSocketAddress.class),
any(InetSocketAddress.class), anyInt()))
.thenReturn(CompletableFuture.completedFuture(cnx));
Whitebox.setInternalState(clientImpl, "cnxPool", pool);
Whitebox.setInternalState(clientImpl, "lookup", lookup);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
index 323333afc74..2e2cf29b2ca 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
@@ -42,6 +42,8 @@ public class UnAckedMessageTrackerTest {
@Test
public void testAddAndRemove() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
Timer timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
1, TimeUnit.MILLISECONDS);
when(client.timer()).thenReturn(timer);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index c81ff146004..1d739524f71 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -27,6 +27,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -46,6 +47,8 @@ public class MultiVersionSchemaInfoProviderTest {
@BeforeMethod
public void setup() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.getLookup()).thenReturn(mock(LookupService.class));
schemaProvider = new MultiVersionSchemaInfoProvider(
TopicName.get("persistent://public/default/my-topic"), client);
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 49bc4161c30..d0c189b5fbd 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBase;
@@ -93,6 +94,8 @@ public class ContextImplTest {
producer = mock(Producer.class);
client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.getCnxPool()).thenReturn(connectionPool);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client,
Schema.BYTES));
when(client.createProducerAsync(any(ProducerConfigurationData.class),
any(), any()))
.thenReturn(CompletableFuture.completedFuture(producer));
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 5e52c89f5fc..936c67b37c0 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
@@ -59,6 +60,7 @@ import
org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -98,6 +100,8 @@ public class PulsarSinkTest {
*/
private static PulsarClientImpl getPulsarClient() throws
PulsarClientException {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 4ac7d53cb7a..745f0ee7ee3 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.functions.source;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertSame;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
@@ -45,6 +48,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
@@ -54,7 +58,6 @@ import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.io.core.SourceContext;
import org.testng.Assert;
import org.mockito.ArgumentMatcher;
-import static org.testng.Assert.assertSame;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
@@ -108,6 +111,8 @@ public class PulsarSourceTest {
*/
private static PulsarClientImpl getPulsarClient() throws
PulsarClientException {
PulsarClientImpl pulsarClient = Mockito.mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
ConsumerBuilder<?> goodConsumerBuilder =
Mockito.mock(ConsumerBuilder.class);
ConsumerBuilder<?> badConsumerBuilder =
Mockito.mock(ConsumerBuilder.class);
Mockito.doReturn(goodConsumerBuilder).when(goodConsumerBuilder).topics(Mockito.argThat(new
TopicMatcher("persistent://sample/ns1/test_result")));
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
index 57f2dca4922..e36825f1a7b 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -78,7 +79,8 @@ public class LeaderServiceTest {
@BeforeMethod
public void setup() throws PulsarClientException {
mockClient = mock(PulsarClientImpl.class);
-
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 84832e74df8..66db2334389 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.WorkerInfo;
@@ -70,7 +71,8 @@ public class MembershipManagerTest {
private static PulsarClient mockPulsarClient() throws
PulsarClientException {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
-
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(mockClient.getCnxPool()).thenReturn(connectionPool);
ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);