This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 006559dbfa5 [fix][test] Fix multiple Pulsar client leaks in tests 
(#18635)
006559dbfa5 is described below

commit 006559dbfa5613e4b4b622416375e5bcbd1d580d
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Nov 26 02:40:47 2022 +0200

    [fix][test] Fix multiple Pulsar client leaks in tests (#18635)
---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  1 +
 .../v3/AdminApiTransactionMultiBrokerTest.java     |  3 ++
 .../broker/admin/v3/AdminApiTransactionTest.java   |  8 +--
 .../service/ReplicatorRemoveClusterTest.java       |  2 +
 .../pulsar/broker/service/ReplicatorTest.java      |  2 +
 .../broker/service/SubscriptionSeekTest.java       |  4 +-
 .../broker/stats/TransactionMetricsTest.java       | 56 +++++++++----------
 .../systopic/PartitionedSystemTopicTest.java       |  6 ++-
 .../broker/transaction/TransactionConsumeTest.java |  6 ++-
 .../pulsar/broker/transaction/TransactionTest.java |  1 +
 .../buffer/TransactionStablePositionTest.java      |  3 ++
 .../TransactionCoordinatorConfigTest.java          |  4 +-
 .../pulsar/client/impl/ProducerCloseTest.java      |  6 +--
 .../client/impl/ProducerMemoryLimitTest.java       |  5 +-
 .../client/impl/ClientInitializationTest.java      |  6 ++-
 .../pulsar/client/impl/ConsumerImplTest.java       |  2 +
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 13 +++--
 .../connect/PulsarOffsetBackingStoreTest.java      |  8 +--
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  |  5 +-
 .../proxy/server/ProxyRolesEnforcementTest.java    | 62 +++++++++++-----------
 .../server/ProxyWithJwtAuthorizationTest.java      |  4 ++
 .../org/apache/pulsar/sql/presto/PulsarAuth.java   |  2 +
 .../testclient/PerformanceTransactionTest.java     |  2 +
 .../pulsar/tests/integration/cli/CLITest.java      |  1 +
 24 files changed, 118 insertions(+), 94 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 6809b4402d7..7bb35cc4d63 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2661,6 +2661,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         URL pulsarUrl = new URL(pulsar.getWebServiceAddress());
 
         admin.topics().createPartitionedTopic(partitionedTopicName, 
startPartitions);
+        @Cleanup
         PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
         Consumer<byte[]> consumer1 = 
client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
index 71be7086e76..463a65fc8ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
@@ -62,6 +62,9 @@ public class AdminApiTransactionMultiBrokerTest extends 
TransactionTestBase {
                             new PartitionedTopicMetadata(NUM_PARTITIONS));
             map = 
admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         }
+        if (pulsarClient != null) {
+            pulsarClient.shutdown();
+        }
         //init tc stores
         pulsarClient = PulsarClient.builder()
                 
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index b57c20bc4f0..019b7c11fd5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -624,8 +624,9 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         }
 
         admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2);
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+        
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
         pulsarClient.close();
+        pulsarClient = null;
         Awaitility.await().until(() -> 
pulsar.getTransactionMetadataStoreService().getStores().size() ==
                         coordinatorSize * 2);
         pulsar.getConfiguration().setAuthenticationEnabled(true);
@@ -821,11 +822,12 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
                 
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
                         new PartitionedTopicMetadata(coordinatorSize));
         
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+        
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
         pulsarClient.close();
+        pulsarClient = null;
         Awaitility.await().until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== coordinatorSize);
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+        
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
     }
 
     private Transaction getTransaction() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
index e10da849cd3..c61efdff26d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -81,6 +82,7 @@ public class ReplicatorRemoveClusterTest extends 
ReplicatorTestBase {
         Assert.assertNotNull(repClient1);
         Assert.assertFalse(repClient1.isClosed());
 
+        @Cleanup
         PulsarClient client = PulsarClient.builder()
                 .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
                 .build();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 7f31ce39c96..28201c3c3df 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1225,6 +1225,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String topicMlName = namespace + "/persistent/cleanTopic";
         admin1.namespaces().createNamespace(namespace, 
Sets.newHashSet(cluster1, cluster2));
 
+        @Cleanup
         PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, 
TimeUnit.SECONDS)
                 .build();
 
@@ -1350,6 +1351,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         admin1.topics().createNonPartitionedTopic(topic);
         // Replicator will not replicate System Topic other than topic policies
         initTransaction(2, admin1, pulsar1.getBrokerServiceUrl(), pulsar1);
+        @Cleanup
         PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl())
                 .enableTransaction(true).build();
         TransactionImpl transaction = (TransactionImpl) client.newTransaction()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index d938e98a4b3..b6f1771c088 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -226,6 +227,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(batchMsgId0.getEntryId(), batchMsgId1.getEntryId());
         assertEquals(batchMsgId1.getEntryId(), msgIdToSeekFirst.getEntryId());
 
+        @Cleanup
         PulsarClient newPulsarClient = PulsarClient.builder()
                 // set start backoff interval short enough to make sure client 
will re-connect quickly
                 .startingBackoffInterval(1, TimeUnit.MICROSECONDS)
@@ -261,8 +263,6 @@ public class SubscriptionSeekTest extends BrokerTestBase {
             MessageId receiveId = consumer.receive().getMessageId();
             assertEquals(receiveId, messageId);
         }
-
-        newPulsarClient.close();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index c35566278f6..766e0b90d69 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -18,11 +18,22 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.pulsar.broker.stats.PrometheusMetricsTest.parseMetrics;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
@@ -50,18 +61,6 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.pulsar.broker.stats.PrometheusMetricsTest.parseMetrics;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 
 @Slf4j
 public class TransactionMetricsTest extends BrokerTestBase {
@@ -82,6 +81,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
                         .build());
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign();
+        
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -91,7 +91,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
     }
 
     @Test
-    public void testTransactionCoordinatorMetrics() throws Exception{
+    public void testTransactionCoordinatorMetrics() throws Exception {
         long timeout = 10000;
         
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
@@ -123,7 +123,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
     }
 
     @Test
-    public void testTransactionCoordinatorRateMetrics() throws Exception{
+    public void testTransactionCoordinatorRateMetrics() throws Exception {
         int txnCount = 120;
         String ns1 = "prop/ns-abc1";
         admin.namespaces().createNamespace(ns1);
@@ -134,7 +134,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
         admin.topics().createNonPartitionedTopic(topic);
         admin.topics().createSubscription(topic, subName, MessageId.earliest);
-        Awaitility.await().atMost(2000,  TimeUnit.MILLISECONDS).until(() ->
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== 1);
 
 
@@ -142,7 +142,6 @@ public class TransactionMetricsTest extends BrokerTestBase {
                 .subscriptionName(subName).topic(topic).subscribe();
 
         List<TxnID> list = new ArrayList<>();
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
         for (int i = 0; i < txnCount; i++) {
             TransactionImpl transaction =
                     (TransactionImpl) pulsarClient.newTransaction()
@@ -150,13 +149,13 @@ public class TransactionMetricsTest extends 
BrokerTestBase {
             TxnID txnID = new TxnID(transaction.getTxnIdMostBits(), 
transaction.getTxnIdLeastBits());
             list.add(txnID);
             if (i == 1) {
-                pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
                 consumer.acknowledgeAsync(new MessageIdImpl(1000, 1000, -1), 
transaction).get();
                 continue;
             }
 
             if (i % 2 == 0) {
-                
pulsar.getTransactionMetadataStoreService().addProducedPartitionToTxn(list.get(i),
 Collections.singletonList(topic)).get();
+                pulsar.getTransactionMetadataStoreService()
+                        .addProducedPartitionToTxn(list.get(i), 
Collections.singletonList(topic)).get();
             } else {
                 
pulsar.getTransactionMetadataStoreService().addAckedPartitionToTxn(list.get(i),
                         
Collections.singletonList(TransactionSubscription.builder().topic(topic)
@@ -167,7 +166,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         for (int i = 0; i < txnCount; i++) {
             if (i % 2 == 0) {
                 
pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), 
TxnAction.COMMIT_VALUE,
-                                false).get();
+                        false).get();
             } else {
                 
pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i), 
TxnAction.ABORT_VALUE,
                         false).get();
@@ -198,7 +197,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
 
         Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() -> {
             try {
-               pulsar.getTransactionMetadataStoreService()
+                pulsar.getTransactionMetadataStoreService()
                         
.getStores().get(transactionCoordinatorIDOne).getTxnMeta(txnID).get();
             } catch (Exception e) {
                 return true;
@@ -226,7 +225,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
     }
 
     @Test
-    public void testManagedLedgerMetrics() throws Exception{
+    public void testManagedLedgerMetrics() throws Exception {
         cleanup();
         ServiceConfiguration serviceConfiguration = getDefaultConf();
         serviceConfiguration.setTopicLevelPoliciesEnabled(false);
@@ -244,10 +243,9 @@ public class TransactionMetricsTest extends BrokerTestBase 
{
         
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
         admin.topics().createSubscription(topic, subName, MessageId.earliest);
 
-        Awaitility.await().atMost(2000,  TimeUnit.MILLISECONDS).until(() ->
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== 1);
 
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
                 .receiverQueueSize(10)
@@ -294,7 +292,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
     }
 
     @Test
-    public void testManagedLedgerMetricsWhenPendingAckNotInit() throws 
Exception{
+    public void testManagedLedgerMetricsWhenPendingAckNotInit() throws 
Exception {
         String ns1 = "prop/ns-abc1";
         admin.namespaces().createNamespace(ns1);
         String topic = "persistent://" + ns1 + 
"/testManagedLedgerMetricsWhenPendingAckNotInit";
@@ -307,11 +305,9 @@ public class TransactionMetricsTest extends BrokerTestBase 
{
         admin.topics().createSubscription(topic, subName, MessageId.earliest);
         admin.topics().createSubscription(topic, subName2, MessageId.earliest);
 
-        Awaitility.await().atMost(2000,  TimeUnit.MILLISECONDS).until(() ->
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== 1);
 
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
-
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
                 .receiverQueueSize(10)
@@ -361,7 +357,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
     }
 
     @Test
-    public void testDuplicateMetricTypeDefinitions() throws Exception{
+    public void testDuplicateMetricTypeDefinitions() throws Exception {
         
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
         TransactionCoordinatorID transactionCoordinatorIDTwo = 
TransactionCoordinatorID.get(1);
@@ -370,7 +366,6 @@ public class TransactionMetricsTest extends BrokerTestBase {
 
         Awaitility.await().until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== 2);
-        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
         Producer<byte[]> p1 = pulsarClient
                 .newProducer()
                 .topic("persistent://my-property/use/my-ns/my-topic1")
@@ -415,7 +410,8 @@ public class TransactionMetricsTest extends BrokerTestBase {
 
                 }
                 // From 
https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
-                // "The TYPE line for a metric name must appear before the 
first sample is reported for that metric name."
+                // "The TYPE line for a metric name must appear before the 
first sample is reported for that metric
+                // name."
                 if (metricNames.containsKey(metricName)) {
                     log.info(metricsStr);
                     fail("TYPE definition for " + metricName + " appears after 
first sample");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index e79197bb1b6..5f1471e0a63 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -250,10 +250,12 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         Thread.sleep(3 * 1000);
 
         try {
-            Producer<String> producerN = PulsarClient.builder()
+            @Cleanup
+            PulsarClient pulsarClient1 = PulsarClient.builder()
                     .maxBackoffInterval(3, TimeUnit.SECONDS)
                     .operationTimeout(5, TimeUnit.SECONDS)
-                    .serviceUrl(lookupUrl.toString()).connectionTimeout(2, 
TimeUnit.SECONDS).build()
+                    .serviceUrl(lookupUrl.toString()).connectionTimeout(2, 
TimeUnit.SECONDS).build();
+            Producer<String> producerN = pulsarClient1
                     .newProducer(Schema.STRING).topic(topic).sendTimeout(3, 
TimeUnit.SECONDS).create();
             Assert.assertTrue(producerN.isConnected());
             producerN.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index c2bff3aec0d..78846fb7592 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -239,8 +239,10 @@ public class TransactionConsumeTest extends 
TransactionTestBase {
             throws ExecutionException, InterruptedException, 
PulsarClientException {
         //Change the state of TB to Ready.
         @Cleanup
-        Producer<String> producer = 
PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
-                .enableTransaction(true).build()
+        PulsarClient pulsarClient1 = 
PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .enableTransaction(true).build();
+        @Cleanup
+        Producer<String> producer = pulsarClient1
                 
.newProducer(Schema.STRING).topic(CONSUME_TOPIC).sendTimeout(0, 
TimeUnit.SECONDS).create();
         List<MessageIdData> positionList = new ArrayList<>();
         for (int i = 0; i < transactionMsgCnt; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 1dd6feb4762..f075b80ef7a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -606,6 +606,7 @@ public class TransactionTest extends TransactionTestBase {
                 .getTopic(topic, false).get().get();
 
         TopicTransactionBuffer topicTransactionBuffer = 
(TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        @Cleanup
         PulsarClient noTxnClient = 
PulsarClient.builder().enableTransaction(false)
                 
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index 6d9933fe725..55d115905a3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -177,6 +177,9 @@ public class TransactionStablePositionTest extends 
TransactionTestBase {
 
         final String topicName = NAMESPACE1 + 
"/testSyncNormalPositionWhenTBRecover-"
                 + clientEnableTransaction + state.name();
+        if (pulsarClient != null) {
+            pulsarClient.shutdown();
+        }
         pulsarClient = PulsarClient.builder()
                 
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
                 .statsInterval(0, TimeUnit.SECONDS)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
index e2f4a6d75e6..a62d7eff5d3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
@@ -62,8 +62,8 @@ public class TransactionCoordinatorConfigTest extends 
BrokerTestBase {
 
     @Test
     public void testMaxActiveTxn() throws Exception {
-        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
-                .enableTransaction(true).operationTimeout(3, 
TimeUnit.SECONDS).build();
+        
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString())
+                .enableTransaction(true).operationTimeout(3, 
TimeUnit.SECONDS));
 
         // new two txn will not reach max active txns
         Transaction commitTxn =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
index a3048ff85c8..1141af88e72 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -121,6 +121,7 @@ public class ProducerCloseTest extends ProducerConsumerBase 
{
 
     @Test(timeOut = 10_000, dataProvider = "produceConf")
     public void brokerCloseTopicTest(boolean enableBatch, boolean isAsyncSend) 
throws Exception {
+        @Cleanup
         PulsarClient longBackOffClient = PulsarClient.builder()
                 .startingBackoffInterval(5, TimeUnit.SECONDS)
                 .maxBackoffInterval(5, TimeUnit.SECONDS)
@@ -147,9 +148,8 @@ public class ProducerCloseTest extends ProducerConsumerBase 
{
     }
 
     private void initClient() throws PulsarClientException {
-        pulsarClient = PulsarClient.builder().
-                serviceUrl(lookupUrl.toString())
-                .build();
+        replacePulsarClient(PulsarClient.builder().
+                serviceUrl(lookupUrl.toString()));
     }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 4f8e08a5f8e..e8e4355346e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -184,10 +184,9 @@ public class ProducerMemoryLimitTest extends 
ProducerConsumerBase {
     }
 
     private void initClientWithMemoryLimit() throws PulsarClientException {
-        pulsarClient = PulsarClient.builder().
+        replacePulsarClient(PulsarClient.builder().
                 serviceUrl(lookupUrl.toString())
-                .memoryLimit(50, SizeUnit.KILO_BYTES)
-                .build();
+                .memoryLimit(50, SizeUnit.KILO_BYTES));
     }
 
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
index 36440b119d0..3b92f362ca1 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
-
+import lombok.Cleanup;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -33,7 +33,9 @@ public class ClientInitializationTest {
     public void testInitializeAuthWithTls() throws PulsarClientException {
         Authentication auth = mock(Authentication.class);
 
-        PulsarClient.builder()
+        @Cleanup
+        PulsarClient pulsarClient =
+                PulsarClient.builder()
                 .serviceUrl("pulsar+ssl://my-host:6650")
                 .authentication(auth)
                 .build();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 5798538bd4f..29d180f5f9a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Cleanup;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Messages;
@@ -228,6 +229,7 @@ public class ConsumerImplTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testCreateConsumerWhenSchemaIsNull() throws 
PulsarClientException {
+        @Cleanup
         PulsarClient client = PulsarClient.builder()
             .serviceUrl("pulsar://127.0.0.1:6650")
             .build();
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 39cb63ca75f..fe2def25023 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.generic.GenericData;
@@ -1095,9 +1096,11 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
 
         // close the producer, open again
         sink = new KafkaConnectSink();
-        when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
+        @Cleanup
+        PulsarClient pulsarClient1 = PulsarClient.builder()
                 .serviceUrl(brokerUrl.toString())
-                .build());
+                .build();
+        when(context.getPulsarClient()).thenReturn(pulsarClient1);
         sink.open(props, context);
 
         // offset is 1 after reopening the producer
@@ -1225,9 +1228,11 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
 
         // close the producer, open again
         sink = new KafkaConnectSink();
-        when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
+        @Cleanup
+        PulsarClient pulsarClient1 = PulsarClient.builder()
                 .serviceUrl(brokerUrl.toString())
-                .build());
+                .build();
+        when(context.getPulsarClient()).thenReturn(pulsarClient1);
         sink.open(props, context);
 
         // offset is 1 after reopening the producer
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index ff7da7bddfa..69362cb2054 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.io.kafka.connect;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import java.nio.ByteBuffer;
@@ -37,7 +36,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -52,7 +50,6 @@ public class PulsarOffsetBackingStoreTest extends 
ProducerConsumerBase {
     private PulsarKafkaWorkerConfig distributedConfig;
     private String topicName;
     private PulsarOffsetBackingStore offsetBackingStore;
-    private PulsarClient client;
 
     @BeforeMethod
     @Override
@@ -62,10 +59,7 @@ public class PulsarOffsetBackingStoreTest extends 
ProducerConsumerBase {
 
         this.topicName = "persistent://my-property/my-ns/offset-topic";
         
this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
topicName);
-        this.client = PulsarClient.builder()
-                .serviceUrl(brokerUrl.toString())
-                .build();
-        this.offsetBackingStore = new PulsarOffsetBackingStore(client);
+        this.offsetBackingStore = new PulsarOffsetBackingStore(pulsarClient);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index b1c5b795ff1..d14105b0b43 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -158,9 +158,8 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
             return AuthTokenUtils.createToken(SECRET_KEY, "client", 
Optional.of(calendar.getTime()));
         });
 
-        pulsarClient = 
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
-                .authentication(authenticationToken)
-                .build();
+        
replacePulsarClient(PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .authentication(authenticationToken));
 
         String topic = "persistent://my-tenant/my-ns/my-topic1";
         @Cleanup
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 6d3d8270fd0..2c8c382b6a5 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -19,9 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static org.mockito.Mockito.spy;
-
 import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,9 +27,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
-
 import javax.naming.AuthenticationException;
-
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -194,15 +190,16 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
                 Sets.newHashSet(AuthAction.consume, AuthAction.produce));
 
-        // Step 2: Try to use proxy Client as a normal Client - expect 
exception
-        PulsarClient proxyClient = 
createPulsarClient(pulsar.getBrokerServiceUrl(), proxyAuthParams);
         boolean exceptionOccurred = false;
-        try {
-            
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
-        } catch (Exception ex) {
-            exceptionOccurred = true;
+        // Step 2: Try to use proxy Client as a normal Client - expect 
exception
+        try (PulsarClient proxyClient = 
createPulsarClient(pulsar.getBrokerServiceUrl(), proxyAuthParams)) {
+            try {
+                
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+            } catch (Exception ex) {
+                exceptionOccurred = true;
+            }
+            Assert.assertTrue(exceptionOccurred);
         }
-        Assert.assertTrue(exceptionOccurred);
 
         // Step 3: Run Pulsar Proxy and pass proxy params as client params - 
expect exception
         ProxyConfiguration proxyConfig = new ProxyConfiguration();
@@ -219,26 +216,29 @@ public class ProxyRolesEnforcementTest extends 
ProducerConsumerBase {
         Set<String> providers = new HashSet<>();
         providers.add(BasicAuthenticationProvider.class.getName());
         proxyConfig.setAuthenticationProviders(providers);
-        ProxyService proxyService = new ProxyService(proxyConfig,
-                                                     new AuthenticationService(
-                                                             
PulsarConfigurationLoader.convertFrom(proxyConfig)));
-        proxyService.start();
-
-        proxyClient = createPulsarClient(proxyService.getServiceUrl(), 
proxyAuthParams);
-        exceptionOccurred = false;
-        try {
-            
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
-        } catch (Exception ex) {
-            exceptionOccurred = true;
-        }
-
-        Assert.assertTrue(exceptionOccurred);
-
-        // Step 4: Pass correct client params
-        proxyClient = createPulsarClient(proxyService.getServiceUrl(), 
clientAuthParams);
-        
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
-        proxyClient.close();
-        proxyService.close();
+
+        try (ProxyService proxyService = new ProxyService(proxyConfig,
+                new AuthenticationService(
+                        PulsarConfigurationLoader.convertFrom(proxyConfig)))) {
+            proxyService.start();
+
+
+            try (PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), proxyAuthParams)) {
+                exceptionOccurred = false;
+                try {
+                    
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+                } catch (Exception ex) {
+                    exceptionOccurred = true;
+                }
+
+                Assert.assertTrue(exceptionOccurred);
+            }
+
+            // Step 4: Pass correct client params
+            try (PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), clientAuthParams)) {
+                
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+            }
+        }
     }
 
     private void createAdminClient() throws PulsarClientException {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 0253448e806..1fbd4b13f79 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import javax.crypto.SecretKey;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.core.Response;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
@@ -146,6 +147,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
 
         startProxy();
         createAdminClient();
+        @Cleanup
         PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
 
         String namespaceName = "my-property/proxy-authorization/my-ns";
@@ -226,6 +228,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
 
         startProxy();
         createAdminClient();
+        @Cleanup
         PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
 
         String clusterName = "proxy-authorization";
@@ -341,6 +344,7 @@ public class ProxyWithJwtAuthorizationTest extends 
ProducerConsumerBase {
 
         startProxy();
         createAdminClient();
+        @Cleanup
         PulsarClient proxyClient = 
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
 
         String namespaceName = "my-property/proxy-authorization/my-ns";
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
index ede0e5fd490..3307faf9c21 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import lombok.Cleanup;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -98,6 +99,7 @@ public class PulsarAuth {
                             topic));
         }
         try {
+            @Cleanup
             PulsarClient client = PulsarClient.builder()
                     
.serviceUrl(pulsarConnectorConfig.getBrokerBinaryServiceUrl())
                     .authentication(authMethod, authParams)
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index d477797aab6..2ccdf62ff0e 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.testclient;
 import com.google.common.collect.Sets;
 import java.net.URL;
 import java.util.concurrent.CountDownLatch;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -100,6 +101,7 @@ public class PerformanceTransactionTest extends 
MockedPulsarServiceBaseTest {
         String args = String.format(argString, testConsumeTopic, 
testProduceTopic,
                 pulsar.getBrokerServiceUrl(), testSub, new 
URL(pulsar.getWebServiceAddress()));
 
+        @Cleanup
         PulsarClient pulsarClient = PulsarClient.builder()
                 .enableTransaction(true)
                 .serviceUrl(pulsar.getBrokerServiceUrl())
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 1c65d897fe9..7e8f5542924 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -83,6 +83,7 @@ public class CLITest extends PulsarTestSuite {
         final String namespace = "public/" + namespaceLocalName;
         assertEquals(0, result.getExitCode());
 
+        @Cleanup
         PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
 
         final String persistentTopicName = TopicName.get(


Reply via email to