This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3519fa0a694 [fix][test] Fix multiple thread leaks in Broker Group 1
unit tests (#21475)
3519fa0a694 is described below
commit 3519fa0a694765db5daa232d1986517f946b9f48
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Oct 30 22:04:58 2023 +0200
[fix][test] Fix multiple thread leaks in Broker Group 1 unit tests (#21475)
---
.../BucketDelayedDeliveryTrackerFactory.java | 3 +++
.../pulsar/broker/auth/AuthorizationTest.java | 2 ++
.../loadbalance/SimpleLoadManagerImplTest.java | 5 +++++
.../AntiAffinityNamespaceGroupExtensionTest.java | 1 +
.../pulsar/broker/service/MaxMessageSizeTest.java | 1 +
.../pulsar/broker/service/ReplicatorTest.java | 14 +++++++------
.../pulsar/broker/service/ReplicatorTestBase.java | 24 ++++++++++++++--------
.../pulsar/broker/transaction/TransactionTest.java | 8 ++++++--
.../MultiRolesTokenAuthorizationProviderTest.java | 3 ++-
9 files changed, 44 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 6a00bfd1995..157fda8acc6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -105,5 +105,8 @@ public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrack
if (bucketSnapshotStorage != null) {
bucketSnapshotStorage.close();
}
+ if (timer != null) {
+ timer.stop();
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 58cf4ee418e..01bfd03ceb8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.EnumSet;
+import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -283,6 +284,7 @@ public class AuthorizationTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().grantPermissionOnNamespace(namespaceV1,
"pass.pass2", EnumSet.of(AuthAction.produce));
admin.namespaces().createNamespace(namespaceV2, Sets.newHashSet("c1"));
admin.namespaces().grantPermissionOnNamespace(namespaceV2,
"pass.pass2", EnumSet.of(AuthAction.produce));
+ @Cleanup
PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(brokerUrl !=
null
? brokerUrl.toString()
: brokerUrlTls.toString())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 6303c70b4dc..7f2767b2e77 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarService;
@@ -188,6 +189,7 @@ public class SimpleLoadManagerImplTest {
@Test
public void testBasicBrokerSelection() throws Exception {
+ @Cleanup("stop")
SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(pulsar1);
PulsarResourceDescription rd = new PulsarResourceDescription();
rd.put("memory", new ResourceUsage(1024, 4096));
@@ -223,6 +225,7 @@ public class SimpleLoadManagerImplTest {
@Test
public void testPrimary() throws Exception {
createNamespacePolicies(pulsar1);
+ @Cleanup("stop")
SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(pulsar1);
PulsarResourceDescription rd = new PulsarResourceDescription();
rd.put("memory", new ResourceUsage(1024, 4096));
@@ -263,6 +266,7 @@ public class SimpleLoadManagerImplTest {
@Test(enabled = false)
public void testPrimarySecondary() throws Exception {
createNamespacePolicies(pulsar1);
+ @Cleanup("stop")
SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(pulsar1);
PulsarResourceDescription rd = new PulsarResourceDescription();
@@ -333,6 +337,7 @@ public class SimpleLoadManagerImplTest {
@Test(enabled = true)
public void testDoLoadShedding() throws Exception {
+ @Cleanup("stop")
SimpleLoadManagerImpl loadManager =
spyWithClassAndConstructorArgsRecordingInvocations(SimpleLoadManagerImpl.class,
pulsar1);
PulsarResourceDescription rd = new PulsarResourceDescription();
rd.put("memory", new ResourceUsage(1024, 4096));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
index 42fc12c2f99..d77490e1b82 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
@@ -109,6 +109,7 @@ public class AntiAffinityNamespaceGroupExtensionTest
extends AntiAffinityNamespa
final String antiAffinityEnabledNameSpace = namespace + nsSuffix;
admin.namespaces().createNamespace(antiAffinityEnabledNameSpace);
admin.namespaces().setNamespaceAntiAffinityGroup(antiAffinityEnabledNameSpace,
namespaceAntiAffinityGroup);
+ @Cleanup
PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(pulsar.getSafeWebServiceAddress()).build();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index fb15661fddf..780d33de521 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -93,6 +93,7 @@ public class MaxMessageSizeTest {
try {
pulsar.close();
bkEnsemble.stop();
+ admin.close();
} catch (Throwable t) {
t.printStackTrace();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 1139bb9e0bf..7e5cecd5796 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -666,8 +666,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/res-cons-id-"));
// Create another consumer using replication prefix as sub id
+ @Cleanup
MessageConsumer consumer = new MessageConsumer(url2, dest,
"pulsar.repl.");
- consumer.close();
} catch (Exception e) {
// SUCCESS
@@ -1714,13 +1714,15 @@ public class ReplicatorTest extends ReplicatorTestBase {
MessageIdImpl lastMessageId = (MessageIdImpl)
topic.getLastMessageId().get();
Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(),
lastMessageId.getEntryId());
- ConcurrentOpenHashMap<String, Replicator> replicators =
topic.getReplicators();
- PersistentReplicator replicator = (PersistentReplicator)
replicators.get("r2");
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30,
TimeUnit.SECONDS)
- .untilAsserted(() ->
assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
- replicator.getState()));
- assertEquals(replicator.getState(),
org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
+ .ignoreExceptions()
+ .untilAsserted(() -> {
+ ConcurrentOpenHashMap<String, Replicator> replicators =
topic.getReplicators();
+ PersistentReplicator replicator = (PersistentReplicator)
replicators.get("r2");
+
assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
+ replicator.getState());
+ });
// Make sure all the data has replicated to the remote cluster before
close the cursor.
Awaitility.await().untilAsserted(() ->
assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index b83e8ac9d2d..47c0f4e35e8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -365,12 +365,16 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
this.namespace = dest.getNamespace();
this.topicName = dest.toString();
client =
PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0,
TimeUnit.SECONDS).build();
- producer = client.newProducer()
- .topic(topicName)
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
-
+ try {
+ producer = client.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ } catch (Exception e) {
+ client.close();
+ throw e;
+ }
}
MessageProducer(URL url, final TopicName dest, boolean batch) throws
Exception {
@@ -383,8 +387,12 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
.enableBatching(batch)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.batchingMaxMessages(5);
- producer = producerBuilder.create();
-
+ try {
+ producer = producerBuilder.create();
+ } catch (Exception e) {
+ client.close();
+ throw e;
+ }
}
void produceBatch(int messages) throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ee7a2e2d0b1..b9a274f5479 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -874,9 +874,10 @@ public class TransactionTest extends TransactionTestBase {
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator =
new MLTransactionSequenceIdGenerator();
persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog =
- new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
+ spy(new MLTransactionLogImpl(new TransactionCoordinatorID(1),
null,
persistentTopic.getManagedLedger().getConfig(), new
TxnLogBufferedWriterConfig(),
- transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
+ transactionTimer, DISABLED_BUFFERED_WRITER_METRICS));
+ doAnswer(__ ->
CompletableFuture.completedFuture(null)).when(mlTransactionLog).closeAsync();
Class<MLTransactionLogImpl> mlTransactionLogClass =
MLTransactionLogImpl.class;
Field field = mlTransactionLogClass.getDeclaredField("cursor");
field.setAccessible(true);
@@ -890,6 +891,7 @@ public class TransactionTest extends TransactionTestBase {
doNothing().when(transactionRecoverTracker).handleCommittingAndAbortingTransaction();
TransactionTimeoutTracker timeoutTracker =
mock(TransactionTimeoutTracker.class);
doNothing().when(timeoutTracker).start();
+ @Cleanup("closeAsync")
MLTransactionMetadataStore metadataStore1 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker,
mlTransactionSequenceIdGenerator, 0L);
@@ -903,6 +905,7 @@ public class TransactionTest extends TransactionTestBase {
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+ @Cleanup("closeAsync")
MLTransactionMetadataStore metadataStore2 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
@@ -917,6 +920,7 @@ public class TransactionTest extends TransactionTestBase {
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+ @Cleanup("closeAsync")
MLTransactionMetadataStore metadataStore3 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker,
mlTransactionSequenceIdGenerator, 0L);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
index 0445ad27ca8..ef775f8f819 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -116,7 +117,7 @@ public class MultiRolesTokenAuthorizationProviderTest
extends MockedPulsarServic
);
}
- @BeforeClass
+ @AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();