This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 006559dbfa5 [fix][test] Fix multiple Pulsar client leaks in tests
(#18635)
006559dbfa5 is described below
commit 006559dbfa5613e4b4b622416375e5bcbd1d580d
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Nov 26 02:40:47 2022 +0200
[fix][test] Fix multiple Pulsar client leaks in tests (#18635)
---
.../apache/pulsar/broker/admin/AdminApi2Test.java | 1 +
.../v3/AdminApiTransactionMultiBrokerTest.java | 3 ++
.../broker/admin/v3/AdminApiTransactionTest.java | 8 +--
.../service/ReplicatorRemoveClusterTest.java | 2 +
.../pulsar/broker/service/ReplicatorTest.java | 2 +
.../broker/service/SubscriptionSeekTest.java | 4 +-
.../broker/stats/TransactionMetricsTest.java | 56 +++++++++----------
.../systopic/PartitionedSystemTopicTest.java | 6 ++-
.../broker/transaction/TransactionConsumeTest.java | 6 ++-
.../pulsar/broker/transaction/TransactionTest.java | 1 +
.../buffer/TransactionStablePositionTest.java | 3 ++
.../TransactionCoordinatorConfigTest.java | 4 +-
.../pulsar/client/impl/ProducerCloseTest.java | 6 +--
.../client/impl/ProducerMemoryLimitTest.java | 5 +-
.../client/impl/ClientInitializationTest.java | 6 ++-
.../pulsar/client/impl/ConsumerImplTest.java | 2 +
.../io/kafka/connect/KafkaConnectSinkTest.java | 13 +++--
.../connect/PulsarOffsetBackingStoreTest.java | 8 +--
.../pulsar/proxy/server/ProxyRefreshAuthTest.java | 5 +-
.../proxy/server/ProxyRolesEnforcementTest.java | 62 +++++++++++-----------
.../server/ProxyWithJwtAuthorizationTest.java | 4 ++
.../org/apache/pulsar/sql/presto/PulsarAuth.java | 2 +
.../testclient/PerformanceTransactionTest.java | 2 +
.../pulsar/tests/integration/cli/CLITest.java | 1 +
24 files changed, 118 insertions(+), 94 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 6809b4402d7..7bb35cc4d63 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2661,6 +2661,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
URL pulsarUrl = new URL(pulsar.getWebServiceAddress());
admin.topics().createPartitionedTopic(partitionedTopicName,
startPartitions);
+ @Cleanup
PulsarClient client =
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
Consumer<byte[]> consumer1 =
client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
.subscriptionType(SubscriptionType.Shared).subscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
index 71be7086e76..463a65fc8ae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
@@ -62,6 +62,9 @@ public class AdminApiTransactionMultiBrokerTest extends
TransactionTestBase {
new PartitionedTopicMetadata(NUM_PARTITIONS));
map =
admin.lookups().lookupPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
}
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
+ }
//init tc stores
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index b57c20bc4f0..019b7c11fd5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -624,8 +624,9 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
}
admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2);
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
pulsarClient.close();
+ pulsarClient = null;
Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size() ==
coordinatorSize * 2);
pulsar.getConfiguration().setAuthenticationEnabled(true);
@@ -821,11 +822,12 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(coordinatorSize));
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
pulsarClient.close();
+ pulsarClient = null;
Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== coordinatorSize);
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
}
private Transaction getTransaction() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
index e10da849cd3..c61efdff26d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -81,6 +82,7 @@ public class ReplicatorRemoveClusterTest extends
ReplicatorTestBase {
Assert.assertNotNull(repClient1);
Assert.assertFalse(repClient1.isClosed());
+ @Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 7f31ce39c96..28201c3c3df 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1225,6 +1225,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
final String topicMlName = namespace + "/persistent/cleanTopic";
admin1.namespaces().createNamespace(namespace,
Sets.newHashSet(cluster1, cluster2));
+ @Cleanup
PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0,
TimeUnit.SECONDS)
.build();
@@ -1350,6 +1351,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
admin1.topics().createNonPartitionedTopic(topic);
// Replicator will not replicate System Topic other than topic policies
initTransaction(2, admin1, pulsar1.getBrokerServiceUrl(), pulsar1);
+ @Cleanup
PulsarClient client =
PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl())
.enableTransaction(true).build();
TransactionImpl transaction = (TransactionImpl) client.newTransaction()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index d938e98a4b3..b6f1771c088 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -226,6 +227,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
assertEquals(batchMsgId0.getEntryId(), batchMsgId1.getEntryId());
assertEquals(batchMsgId1.getEntryId(), msgIdToSeekFirst.getEntryId());
+ @Cleanup
PulsarClient newPulsarClient = PulsarClient.builder()
// set start backoff interval short enough to make sure client
will re-connect quickly
.startingBackoffInterval(1, TimeUnit.MICROSECONDS)
@@ -261,8 +263,6 @@ public class SubscriptionSeekTest extends BrokerTestBase {
MessageId receiveId = consumer.receive().getMessageId();
assertEquals(receiveId, messageId);
}
-
- newPulsarClient.close();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index c35566278f6..766e0b90d69 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -18,11 +18,22 @@
*/
package org.apache.pulsar.broker.stats;
+import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.pulsar.broker.stats.PrometheusMetricsTest.parseMetrics;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.base.Splitter;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
@@ -50,18 +61,6 @@ import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static
org.apache.pulsar.broker.stats.PrometheusMetricsTest.parseMetrics;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
@Slf4j
public class TransactionMetricsTest extends BrokerTestBase {
@@ -82,6 +81,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
.build());
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
createTransactionCoordinatorAssign();
+
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
}
@AfterMethod(alwaysRun = true)
@@ -91,7 +91,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
}
@Test
- public void testTransactionCoordinatorMetrics() throws Exception{
+ public void testTransactionCoordinatorMetrics() throws Exception {
long timeout = 10000;
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
TransactionCoordinatorID transactionCoordinatorIDOne =
TransactionCoordinatorID.get(0);
@@ -123,7 +123,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
}
@Test
- public void testTransactionCoordinatorRateMetrics() throws Exception{
+ public void testTransactionCoordinatorRateMetrics() throws Exception {
int txnCount = 120;
String ns1 = "prop/ns-abc1";
admin.namespaces().createNamespace(ns1);
@@ -134,7 +134,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, subName, MessageId.earliest);
- Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
+ Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== 1);
@@ -142,7 +142,6 @@ public class TransactionMetricsTest extends BrokerTestBase {
.subscriptionName(subName).topic(topic).subscribe();
List<TxnID> list = new ArrayList<>();
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
for (int i = 0; i < txnCount; i++) {
TransactionImpl transaction =
(TransactionImpl) pulsarClient.newTransaction()
@@ -150,13 +149,13 @@ public class TransactionMetricsTest extends
BrokerTestBase {
TxnID txnID = new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits());
list.add(txnID);
if (i == 1) {
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
consumer.acknowledgeAsync(new MessageIdImpl(1000, 1000, -1),
transaction).get();
continue;
}
if (i % 2 == 0) {
-
pulsar.getTransactionMetadataStoreService().addProducedPartitionToTxn(list.get(i),
Collections.singletonList(topic)).get();
+ pulsar.getTransactionMetadataStoreService()
+ .addProducedPartitionToTxn(list.get(i),
Collections.singletonList(topic)).get();
} else {
pulsar.getTransactionMetadataStoreService().addAckedPartitionToTxn(list.get(i),
Collections.singletonList(TransactionSubscription.builder().topic(topic)
@@ -167,7 +166,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
for (int i = 0; i < txnCount; i++) {
if (i % 2 == 0) {
pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i),
TxnAction.COMMIT_VALUE,
- false).get();
+ false).get();
} else {
pulsar.getTransactionMetadataStoreService().endTransaction(list.get(i),
TxnAction.ABORT_VALUE,
false).get();
@@ -198,7 +197,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() -> {
try {
- pulsar.getTransactionMetadataStoreService()
+ pulsar.getTransactionMetadataStoreService()
.getStores().get(transactionCoordinatorIDOne).getTxnMeta(txnID).get();
} catch (Exception e) {
return true;
@@ -226,7 +225,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
}
@Test
- public void testManagedLedgerMetrics() throws Exception{
+ public void testManagedLedgerMetrics() throws Exception {
cleanup();
ServiceConfiguration serviceConfiguration = getDefaultConf();
serviceConfiguration.setTopicLevelPoliciesEnabled(false);
@@ -244,10 +243,9 @@ public class TransactionMetricsTest extends BrokerTestBase
{
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
admin.topics().createSubscription(topic, subName, MessageId.earliest);
- Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
+ Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== 1);
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.receiverQueueSize(10)
@@ -294,7 +292,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
}
@Test
- public void testManagedLedgerMetricsWhenPendingAckNotInit() throws
Exception{
+ public void testManagedLedgerMetricsWhenPendingAckNotInit() throws
Exception {
String ns1 = "prop/ns-abc1";
admin.namespaces().createNamespace(ns1);
String topic = "persistent://" + ns1 +
"/testManagedLedgerMetricsWhenPendingAckNotInit";
@@ -307,11 +305,9 @@ public class TransactionMetricsTest extends BrokerTestBase
{
admin.topics().createSubscription(topic, subName, MessageId.earliest);
admin.topics().createSubscription(topic, subName2, MessageId.earliest);
- Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
+ Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== 1);
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
-
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.receiverQueueSize(10)
@@ -361,7 +357,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
}
@Test
- public void testDuplicateMetricTypeDefinitions() throws Exception{
+ public void testDuplicateMetricTypeDefinitions() throws Exception {
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
TransactionCoordinatorID transactionCoordinatorIDOne =
TransactionCoordinatorID.get(0);
TransactionCoordinatorID transactionCoordinatorIDTwo =
TransactionCoordinatorID.get(1);
@@ -370,7 +366,6 @@ public class TransactionMetricsTest extends BrokerTestBase {
Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== 2);
- pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
Producer<byte[]> p1 = pulsarClient
.newProducer()
.topic("persistent://my-property/use/my-ns/my-topic1")
@@ -415,7 +410,8 @@ public class TransactionMetricsTest extends BrokerTestBase {
}
// From
https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
- // "The TYPE line for a metric name must appear before the
first sample is reported for that metric name."
+ // "The TYPE line for a metric name must appear before the
first sample is reported for that metric
+ // name."
if (metricNames.containsKey(metricName)) {
log.info(metricsStr);
fail("TYPE definition for " + metricName + " appears after
first sample");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index e79197bb1b6..5f1471e0a63 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -250,10 +250,12 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
Thread.sleep(3 * 1000);
try {
- Producer<String> producerN = PulsarClient.builder()
+ @Cleanup
+ PulsarClient pulsarClient1 = PulsarClient.builder()
.maxBackoffInterval(3, TimeUnit.SECONDS)
.operationTimeout(5, TimeUnit.SECONDS)
- .serviceUrl(lookupUrl.toString()).connectionTimeout(2,
TimeUnit.SECONDS).build()
+ .serviceUrl(lookupUrl.toString()).connectionTimeout(2,
TimeUnit.SECONDS).build();
+ Producer<String> producerN = pulsarClient1
.newProducer(Schema.STRING).topic(topic).sendTimeout(3,
TimeUnit.SECONDS).create();
Assert.assertTrue(producerN.isConnected());
producerN.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index c2bff3aec0d..78846fb7592 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -239,8 +239,10 @@ public class TransactionConsumeTest extends
TransactionTestBase {
throws ExecutionException, InterruptedException,
PulsarClientException {
//Change the state of TB to Ready.
@Cleanup
- Producer<String> producer =
PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
- .enableTransaction(true).build()
+ PulsarClient pulsarClient1 =
PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+ .enableTransaction(true).build();
+ @Cleanup
+ Producer<String> producer = pulsarClient1
.newProducer(Schema.STRING).topic(CONSUME_TOPIC).sendTimeout(0,
TimeUnit.SECONDS).create();
List<MessageIdData> positionList = new ArrayList<>();
for (int i = 0; i < transactionMsgCnt; i++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 1dd6feb4762..f075b80ef7a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -606,6 +606,7 @@ public class TransactionTest extends TransactionTestBase {
.getTopic(topic, false).get().get();
TopicTransactionBuffer topicTransactionBuffer =
(TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+ @Cleanup
PulsarClient noTxnClient =
PulsarClient.builder().enableTransaction(false)
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index 6d9933fe725..55d115905a3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -177,6 +177,9 @@ public class TransactionStablePositionTest extends
TransactionTestBase {
final String topicName = NAMESPACE1 +
"/testSyncNormalPositionWhenTBRecover-"
+ clientEnableTransaction + state.name();
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
+ }
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
index e2f4a6d75e6..a62d7eff5d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
@@ -62,8 +62,8 @@ public class TransactionCoordinatorConfigTest extends
BrokerTestBase {
@Test
public void testMaxActiveTxn() throws Exception {
- pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
- .enableTransaction(true).operationTimeout(3,
TimeUnit.SECONDS).build();
+
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString())
+ .enableTransaction(true).operationTimeout(3,
TimeUnit.SECONDS));
// new two txn will not reach max active txns
Transaction commitTxn =
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
index a3048ff85c8..1141af88e72 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -121,6 +121,7 @@ public class ProducerCloseTest extends ProducerConsumerBase
{
@Test(timeOut = 10_000, dataProvider = "produceConf")
public void brokerCloseTopicTest(boolean enableBatch, boolean isAsyncSend)
throws Exception {
+ @Cleanup
PulsarClient longBackOffClient = PulsarClient.builder()
.startingBackoffInterval(5, TimeUnit.SECONDS)
.maxBackoffInterval(5, TimeUnit.SECONDS)
@@ -147,9 +148,8 @@ public class ProducerCloseTest extends ProducerConsumerBase
{
}
private void initClient() throws PulsarClientException {
- pulsarClient = PulsarClient.builder().
- serviceUrl(lookupUrl.toString())
- .build();
+ replacePulsarClient(PulsarClient.builder().
+ serviceUrl(lookupUrl.toString()));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 4f8e08a5f8e..e8e4355346e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -184,10 +184,9 @@ public class ProducerMemoryLimitTest extends
ProducerConsumerBase {
}
private void initClientWithMemoryLimit() throws PulsarClientException {
- pulsarClient = PulsarClient.builder().
+ replacePulsarClient(PulsarClient.builder().
serviceUrl(lookupUrl.toString())
- .memoryLimit(50, SizeUnit.KILO_BYTES)
- .build();
+ .memoryLimit(50, SizeUnit.KILO_BYTES));
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
index 36440b119d0..3b92f362ca1 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-
+import lombok.Cleanup;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -33,7 +33,9 @@ public class ClientInitializationTest {
public void testInitializeAuthWithTls() throws PulsarClientException {
Authentication auth = mock(Authentication.class);
- PulsarClient.builder()
+ @Cleanup
+ PulsarClient pulsarClient =
+ PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-host:6650")
.authentication(auth)
.build();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 5798538bd4f..29d180f5f9a 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
@@ -228,6 +229,7 @@ public class ConsumerImplTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testCreateConsumerWhenSchemaIsNull() throws
PulsarClientException {
+ @Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://127.0.0.1:6650")
.build();
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 39cb63ca75f..fe2def25023 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
+import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
@@ -1095,9 +1096,11 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
// close the producer, open again
sink = new KafkaConnectSink();
- when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
+ @Cleanup
+ PulsarClient pulsarClient1 = PulsarClient.builder()
.serviceUrl(brokerUrl.toString())
- .build());
+ .build();
+ when(context.getPulsarClient()).thenReturn(pulsarClient1);
sink.open(props, context);
// offset is 1 after reopening the producer
@@ -1225,9 +1228,11 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
// close the producer, open again
sink = new KafkaConnectSink();
- when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
+ @Cleanup
+ PulsarClient pulsarClient1 = PulsarClient.builder()
.serviceUrl(brokerUrl.toString())
- .build());
+ .build();
+ when(context.getPulsarClient()).thenReturn(pulsarClient1);
sink.open(props, context);
// offset is 1 after reopening the producer
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
index ff7da7bddfa..69362cb2054 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStoreTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.io.kafka.connect;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
@@ -37,7 +36,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.util.Callback;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -52,7 +50,6 @@ public class PulsarOffsetBackingStoreTest extends
ProducerConsumerBase {
private PulsarKafkaWorkerConfig distributedConfig;
private String topicName;
private PulsarOffsetBackingStore offsetBackingStore;
- private PulsarClient client;
@BeforeMethod
@Override
@@ -62,10 +59,7 @@ public class PulsarOffsetBackingStoreTest extends
ProducerConsumerBase {
this.topicName = "persistent://my-property/my-ns/offset-topic";
this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
topicName);
- this.client = PulsarClient.builder()
- .serviceUrl(brokerUrl.toString())
- .build();
- this.offsetBackingStore = new PulsarOffsetBackingStore(client);
+ this.offsetBackingStore = new PulsarOffsetBackingStore(pulsarClient);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index b1c5b795ff1..d14105b0b43 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -158,9 +158,8 @@ public class ProxyRefreshAuthTest extends
ProducerConsumerBase {
return AuthTokenUtils.createToken(SECRET_KEY, "client",
Optional.of(calendar.getTime()));
});
- pulsarClient =
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
- .authentication(authenticationToken)
- .build();
+
replacePulsarClient(PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+ .authentication(authenticationToken));
String topic = "persistent://my-tenant/my-ns/my-topic1";
@Cleanup
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 6d3d8270fd0..2c8c382b6a5 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -19,9 +19,7 @@
package org.apache.pulsar.proxy.server;
import static org.mockito.Mockito.spy;
-
import com.google.common.collect.Sets;
-
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,9 +27,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
-
import javax.naming.AuthenticationException;
-
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -194,15 +190,16 @@ public class ProxyRolesEnforcementTest extends
ProducerConsumerBase {
admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
- // Step 2: Try to use proxy Client as a normal Client - expect
exception
- PulsarClient proxyClient =
createPulsarClient(pulsar.getBrokerServiceUrl(), proxyAuthParams);
boolean exceptionOccurred = false;
- try {
-
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
- } catch (Exception ex) {
- exceptionOccurred = true;
+ // Step 2: Try to use proxy Client as a normal Client - expect
exception
+ try (PulsarClient proxyClient =
createPulsarClient(pulsar.getBrokerServiceUrl(), proxyAuthParams)) {
+ try {
+
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ } catch (Exception ex) {
+ exceptionOccurred = true;
+ }
+ Assert.assertTrue(exceptionOccurred);
}
- Assert.assertTrue(exceptionOccurred);
// Step 3: Run Pulsar Proxy and pass proxy params as client params -
expect exception
ProxyConfiguration proxyConfig = new ProxyConfiguration();
@@ -219,26 +216,29 @@ public class ProxyRolesEnforcementTest extends
ProducerConsumerBase {
Set<String> providers = new HashSet<>();
providers.add(BasicAuthenticationProvider.class.getName());
proxyConfig.setAuthenticationProviders(providers);
- ProxyService proxyService = new ProxyService(proxyConfig,
- new AuthenticationService(
-
PulsarConfigurationLoader.convertFrom(proxyConfig)));
- proxyService.start();
-
- proxyClient = createPulsarClient(proxyService.getServiceUrl(),
proxyAuthParams);
- exceptionOccurred = false;
- try {
-
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
- } catch (Exception ex) {
- exceptionOccurred = true;
- }
-
- Assert.assertTrue(exceptionOccurred);
-
- // Step 4: Pass correct client params
- proxyClient = createPulsarClient(proxyService.getServiceUrl(),
clientAuthParams);
-
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
- proxyClient.close();
- proxyService.close();
+
+ try (ProxyService proxyService = new ProxyService(proxyConfig,
+ new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig)))) {
+ proxyService.start();
+
+
+ try (PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), proxyAuthParams)) {
+ exceptionOccurred = false;
+ try {
+
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ } catch (Exception ex) {
+ exceptionOccurred = true;
+ }
+
+ Assert.assertTrue(exceptionOccurred);
+ }
+
+ // Step 4: Pass correct client params
+ try (PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), clientAuthParams)) {
+
proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ }
+ }
}
private void createAdminClient() throws PulsarClientException {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 0253448e806..1fbd4b13f79 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import javax.crypto.SecretKey;
import javax.ws.rs.client.Client;
import javax.ws.rs.core.Response;
+import lombok.Cleanup;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
@@ -146,6 +147,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
startProxy();
createAdminClient();
+ @Cleanup
PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
String namespaceName = "my-property/proxy-authorization/my-ns";
@@ -226,6 +228,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
startProxy();
createAdminClient();
+ @Cleanup
PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
String clusterName = "proxy-authorization";
@@ -341,6 +344,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
startProxy();
createAdminClient();
+ @Cleanup
PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
String namespaceName = "my-property/proxy-authorization/my-ns";
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
index ede0e5fd490..3307faf9c21 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.Cleanup;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -98,6 +99,7 @@ public class PulsarAuth {
topic));
}
try {
+ @Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarConnectorConfig.getBrokerBinaryServiceUrl())
.authentication(authMethod, authParams)
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index d477797aab6..2ccdf62ff0e 100644
---
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.testclient;
import com.google.common.collect.Sets;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -100,6 +101,7 @@ public class PerformanceTransactionTest extends
MockedPulsarServiceBaseTest {
String args = String.format(argString, testConsumeTopic,
testProduceTopic,
pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+ @Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.enableTransaction(true)
.serviceUrl(pulsar.getBrokerServiceUrl())
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 1c65d897fe9..7e8f5542924 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -83,6 +83,7 @@ public class CLITest extends PulsarTestSuite {
final String namespace = "public/" + namespaceLocalName;
assertEquals(0, result.getExitCode());
+ @Cleanup
PulsarClient client =
PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
final String persistentTopicName = TopicName.get(