This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3519fa0a694 [fix][test] Fix multiple thread leaks in Broker Group 1 
unit tests (#21475)
3519fa0a694 is described below

commit 3519fa0a694765db5daa232d1986517f946b9f48
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Oct 30 22:04:58 2023 +0200

    [fix][test] Fix multiple thread leaks in Broker Group 1 unit tests (#21475)
---
 .../BucketDelayedDeliveryTrackerFactory.java       |  3 +++
 .../pulsar/broker/auth/AuthorizationTest.java      |  2 ++
 .../loadbalance/SimpleLoadManagerImplTest.java     |  5 +++++
 .../AntiAffinityNamespaceGroupExtensionTest.java   |  1 +
 .../pulsar/broker/service/MaxMessageSizeTest.java  |  1 +
 .../pulsar/broker/service/ReplicatorTest.java      | 14 +++++++------
 .../pulsar/broker/service/ReplicatorTestBase.java  | 24 ++++++++++++++--------
 .../pulsar/broker/transaction/TransactionTest.java |  8 ++++++--
 .../MultiRolesTokenAuthorizationProviderTest.java  |  3 ++-
 9 files changed, 44 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 6a00bfd1995..157fda8acc6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -105,5 +105,8 @@ public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrack
         if (bucketSnapshotStorage != null) {
             bucketSnapshotStorage.close();
         }
+        if (timer != null) {
+            timer.stop();
+        }
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 58cf4ee418e..01bfd03ceb8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.EnumSet;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -283,6 +284,7 @@ public class AuthorizationTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().grantPermissionOnNamespace(namespaceV1, 
"pass.pass2", EnumSet.of(AuthAction.produce));
         admin.namespaces().createNamespace(namespaceV2, Sets.newHashSet("c1"));
         admin.namespaces().grantPermissionOnNamespace(namespaceV2, 
"pass.pass2", EnumSet.of(AuthAction.produce));
+        @Cleanup
         PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != 
null
                         ? brokerUrl.toString()
                         : brokerUrlTls.toString())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 6303c70b4dc..7f2767b2e77 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.pulsar.broker.PulsarService;
@@ -188,6 +189,7 @@ public class SimpleLoadManagerImplTest {
 
     @Test
     public void testBasicBrokerSelection() throws Exception {
+        @Cleanup("stop")
         SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(pulsar1);
         PulsarResourceDescription rd = new PulsarResourceDescription();
         rd.put("memory", new ResourceUsage(1024, 4096));
@@ -223,6 +225,7 @@ public class SimpleLoadManagerImplTest {
     @Test
     public void testPrimary() throws Exception {
         createNamespacePolicies(pulsar1);
+        @Cleanup("stop")
         SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(pulsar1);
         PulsarResourceDescription rd = new PulsarResourceDescription();
         rd.put("memory", new ResourceUsage(1024, 4096));
@@ -263,6 +266,7 @@ public class SimpleLoadManagerImplTest {
     @Test(enabled = false)
     public void testPrimarySecondary() throws Exception {
         createNamespacePolicies(pulsar1);
+        @Cleanup("stop")
         SimpleLoadManagerImpl loadManager = new SimpleLoadManagerImpl(pulsar1);
 
         PulsarResourceDescription rd = new PulsarResourceDescription();
@@ -333,6 +337,7 @@ public class SimpleLoadManagerImplTest {
 
     @Test(enabled = true)
     public void testDoLoadShedding() throws Exception {
+        @Cleanup("stop")
         SimpleLoadManagerImpl loadManager = 
spyWithClassAndConstructorArgsRecordingInvocations(SimpleLoadManagerImpl.class, 
pulsar1);
         PulsarResourceDescription rd = new PulsarResourceDescription();
         rd.put("memory", new ResourceUsage(1024, 4096));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
index 42fc12c2f99..d77490e1b82 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
@@ -109,6 +109,7 @@ public class AntiAffinityNamespaceGroupExtensionTest 
extends AntiAffinityNamespa
         final String antiAffinityEnabledNameSpace = namespace + nsSuffix;
         admin.namespaces().createNamespace(antiAffinityEnabledNameSpace);
         
admin.namespaces().setNamespaceAntiAffinityGroup(antiAffinityEnabledNameSpace, 
namespaceAntiAffinityGroup);
+        @Cleanup
         PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar.getSafeWebServiceAddress()).build();
         @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index fb15661fddf..780d33de521 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -93,6 +93,7 @@ public class MaxMessageSizeTest {
         try {
             pulsar.close();
             bkEnsemble.stop();
+            admin.close();
         } catch (Throwable t) {
             t.printStackTrace();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 1139bb9e0bf..7e5cecd5796 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -666,8 +666,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
                     
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/res-cons-id-"));
 
             // Create another consumer using replication prefix as sub id
+            @Cleanup
             MessageConsumer consumer = new MessageConsumer(url2, dest, 
"pulsar.repl.");
-            consumer.close();
 
         } catch (Exception e) {
             // SUCCESS
@@ -1714,13 +1714,15 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         MessageIdImpl lastMessageId = (MessageIdImpl) 
topic.getLastMessageId().get();
         Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), 
lastMessageId.getEntryId());
-        ConcurrentOpenHashMap<String, Replicator> replicators = 
topic.getReplicators();
-        PersistentReplicator replicator = (PersistentReplicator) 
replicators.get("r2");
 
         Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, 
TimeUnit.SECONDS)
-                .untilAsserted(() -> 
assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
-                        replicator.getState()));
-        assertEquals(replicator.getState(), 
org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
+                .ignoreExceptions()
+                .untilAsserted(() -> {
+                    ConcurrentOpenHashMap<String, Replicator> replicators = 
topic.getReplicators();
+                    PersistentReplicator replicator = (PersistentReplicator) 
replicators.get("r2");
+                    
assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
+                            replicator.getState());
+                });
 
         // Make sure all the data has replicated to the remote cluster before 
close the cursor.
         Awaitility.await().untilAsserted(() -> 
assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index b83e8ac9d2d..47c0f4e35e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -365,12 +365,16 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
             this.namespace = dest.getNamespace();
             this.topicName = dest.toString();
             client = 
PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();
-            producer = client.newProducer()
-                .topic(topicName)
-                .enableBatching(false)
-                .messageRoutingMode(MessageRoutingMode.SinglePartition)
-                .create();
-
+            try {
+                producer = client.newProducer()
+                        .topic(topicName)
+                        .enableBatching(false)
+                        .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                        .create();
+            } catch (Exception e) {
+                client.close();
+                throw e;
+            }
         }
 
         MessageProducer(URL url, final TopicName dest, boolean batch) throws 
Exception {
@@ -383,8 +387,12 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
                 .enableBatching(batch)
                 .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
                 .batchingMaxMessages(5);
-            producer = producerBuilder.create();
-
+            try {
+                producer = producerBuilder.create();
+            } catch (Exception e) {
+                client.close();
+                throw e;
+            }
         }
 
         void produceBatch(int messages) throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ee7a2e2d0b1..b9a274f5479 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -874,9 +874,10 @@ public class TransactionTest extends TransactionTestBase {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = 
new MLTransactionSequenceIdGenerator();
         
persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog =
-                new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
+                spy(new MLTransactionLogImpl(new TransactionCoordinatorID(1), 
null,
                         persistentTopic.getManagedLedger().getConfig(), new 
TxnLogBufferedWriterConfig(),
-                        transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
+                        transactionTimer, DISABLED_BUFFERED_WRITER_METRICS));
+        doAnswer(__ -> 
CompletableFuture.completedFuture(null)).when(mlTransactionLog).closeAsync();
         Class<MLTransactionLogImpl> mlTransactionLogClass = 
MLTransactionLogImpl.class;
         Field field = mlTransactionLogClass.getDeclaredField("cursor");
         field.setAccessible(true);
@@ -890,6 +891,7 @@ public class TransactionTest extends TransactionTestBase {
         
doNothing().when(transactionRecoverTracker).handleCommittingAndAbortingTransaction();
         TransactionTimeoutTracker timeoutTracker = 
mock(TransactionTimeoutTracker.class);
         doNothing().when(timeoutTracker).start();
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore metadataStore1 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
                         mlTransactionLog, timeoutTracker, 
mlTransactionSequenceIdGenerator, 0L);
@@ -903,6 +905,7 @@ public class TransactionTest extends TransactionTestBase {
             return null;
         }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
 
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore metadataStore2 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
 
@@ -917,6 +920,7 @@ public class TransactionTest extends TransactionTestBase {
             return null;
         }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
 
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore metadataStore3 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
                         mlTransactionLog, timeoutTracker, 
mlTransactionSequenceIdGenerator, 0L);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
index 0445ad27ca8..ef775f8f819 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -116,7 +117,7 @@ public class MultiRolesTokenAuthorizationProviderTest 
extends MockedPulsarServic
         );
     }
 
-    @BeforeClass
+    @AfterClass
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();

Reply via email to