This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 51202a68895 [fix][test] Fix multiple thread leaks in tests, part 3 
(#21543)
51202a68895 is described below

commit 51202a6889582117bf790e9ff2325b9f3119510f
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 8 16:36:03 2023 +0200

    [fix][test] Fix multiple thread leaks in tests, part 3 (#21543)
---
 .../impl/ManagedLedgerFactoryShutdownTest.java     |   2 +
 .../authentication/AuthenticationProviderSasl.java |   4 +
 .../authentication/SaslAuthenticateTest.java       |  44 +--
 .../jetty/tls/JettySslContextFactoryTest.java      |  13 +-
 .../JettySslContextFactoryWithKeyStoreTest.java    |  13 +-
 .../pulsar/broker/service/BatchMessageTest.java    |   1 +
 .../client/impl/auth/AuthenticationSasl.java       |   6 +
 .../pulsar/common/sasl/TGTRefreshThread.java       |   2 +-
 .../instance/JavaInstanceRunnableTest.java         |  35 ++-
 .../windowing/WaterMarkEventGeneratorTest.java     |   2 +-
 .../worker/FunctionRuntimeManagerTest.java         |  14 +-
 .../functions/worker/MembershipManagerTest.java    |   5 +
 .../functions/worker/SchedulerManagerTest.java     |   1 +
 .../bookkeeper/PulsarRegistrationClientTest.java   |   1 +
 .../pulsar/testclient/PerformanceTransaction.java  | 303 +++++++++++----------
 .../pulsar/testclient/PerformanceProducerTest.java |   8 +-
 .../MLTransactionMetadataStoreTest.java            |  53 ++--
 .../pulsar/websocket/PingPongSupportTest.java      |  11 +-
 .../impl/FileSystemManagedLedgerOffloader.java     |   4 +
 .../offload/filesystem/FileStoreTestBase.java      |  26 +-
 .../impl/FileSystemManagedLedgerOffloaderTest.java |  16 ++
 21 files changed, 345 insertions(+), 219 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index c223490f1c7..1c9fb29066b 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -130,6 +131,7 @@ public class ManagedLedgerFactoryShutdownTest {
 
         LedgerHandle ledgerHandle = mock(LedgerHandle.class);
         LedgerHandle newLedgerHandle = mock(LedgerHandle.class);
+        @Cleanup("shutdownNow")
         OrderedExecutor executor = 
OrderedExecutor.newBuilder().name("Test").build();
         given(bookKeeper.getMainWorkerPool()).willReturn(executor);
         doAnswer(inv -> {
diff --git 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index ed2d05ab722..2616f90c664 100644
--- 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++ 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -125,6 +125,10 @@ public class AuthenticationProviderSasl implements 
AuthenticationProvider {
 
     @Override
     public void close() throws IOException {
+        if (jaasCredentialsContainer != null) {
+            jaasCredentialsContainer.close();
+            jaasCredentialsContainer = null;
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index f4a797d6a4a..ae282a49dc3 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -29,6 +29,7 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.google.common.collect.ImmutableSet;
 import java.io.File;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.nio.file.Files;
@@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
 import javax.security.auth.login.Configuration;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -77,7 +79,7 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
     private static Properties properties;
 
     private static String localHostname = "localhost";
-    private static Authentication authSasl;
+    private Authentication authSasl;
 
     @BeforeClass
     public static void startMiniKdc() throws Exception {
@@ -146,17 +148,11 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         System.setProperty("java.security.krb5.conf", 
krb5file.getAbsolutePath());
         Configuration.getConfiguration().refresh();
 
-        // Client config
-        Map<String, String> clientSaslConfig = new HashMap<>();
-        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
-        clientSaslConfig.put("serverType", "broker");
-        log.info("set client jaas section name: PulsarClient");
-        authSasl = 
AuthenticationFactory.create(AuthenticationSasl.class.getName(), 
clientSaslConfig);
-        log.info("created AuthenticationSasl");
+
     }
 
     @AfterClass(alwaysRun = true)
-    public static void stopMiniKdc() {
+    public static void stopMiniKdc() throws IOException {
         System.clearProperty("java.security.auth.login.config");
         System.clearProperty("java.security.krb5.conf");
         if (kdc != null) {
@@ -175,6 +171,14 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         // use http lookup to verify HttpClient works well.
         isTcpLookup = false;
 
+        // Client config
+        Map<String, String> clientSaslConfig = new HashMap<>();
+        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
+        clientSaslConfig.put("serverType", "broker");
+        log.info("set client jaas section name: PulsarClient");
+        authSasl = 
AuthenticationFactory.create(AuthenticationSasl.class.getName(), 
clientSaslConfig);
+        log.info("created AuthenticationSasl");
+
         conf.setAdvertisedAddress(localHostname);
         conf.setAuthenticationEnabled(true);
         conf.setSaslJaasClientAllowedIds(".*" + "client" + ".*");
@@ -187,9 +191,6 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         conf.setAuthenticationProviders(providers);
         conf.setClusterName("test");
         conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + 
kdc.getRealm()));
-        Map<String, String> clientSaslConfig = new HashMap<>();
-        clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
-        clientSaslConfig.put("serverType", "broker");
         
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
         conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
                 
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
@@ -307,8 +308,11 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
 
     @Test
     public void testSaslOnlyAuthFirstStage() throws Exception {
-        AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl) 
pulsar.getBrokerService()
-                
.getAuthenticationService().getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME);
+        @Cleanup
+        AuthenticationProviderSasl saslServer = new 
AuthenticationProviderSasl();
+        // The cache expiration time is set to 50ms. Residual auth info should 
be cleaned up
+        conf.setInflightSaslContextExpiryMs(50);
+        saslServer.initialize(conf);
 
         HttpServletRequest servletRequest = mock(HttpServletRequest.class);
         doReturn("Init").when(servletRequest).getHeader("State");
@@ -325,9 +329,6 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         field.setAccessible(true);
         Cache<Long, AuthenticationState> cache = (Cache<Long, 
AuthenticationState>) field.get(saslServer);
         assertEquals(cache.asMap().size(), 10);
-        // The cache expiration time is set to 1ms. Residual auth info should 
be cleaned up
-        conf.setInflightSaslContextExpiryMs(1);
-        saslServer.initialize(conf);
         // Add more auth info into memory
         for (int i = 0; i < 10; i++) {
             AuthenticationDataProvider dataProvider =  
authSasl.getAuthData("localhost");
@@ -339,7 +340,7 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         }
         long start = System.currentTimeMillis();
         while (true) {
-            if (System.currentTimeMillis() - start > 10_00) {
+            if (System.currentTimeMillis() - start > 1000) {
                 fail();
             }
             cache = (Cache<Long, AuthenticationState>) field.get(saslServer);
@@ -347,14 +348,14 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
             if (CollectionUtils.hasElements(cache.asMap())) {
                 break;
             }
-            Thread.yield();
+            Thread.sleep(5);
         }
     }
 
     @Test
     public void testMaxInflightContext() throws Exception {
-        AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl) 
pulsar.getBrokerService()
-                
.getAuthenticationService().getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME);
+        @Cleanup
+        AuthenticationProviderSasl saslServer = new 
AuthenticationProviderSasl();
         HttpServletRequest servletRequest = mock(HttpServletRequest.class);
         doReturn("Init").when(servletRequest).getHeader("State");
         conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
@@ -375,5 +376,4 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         //only 1 context was left in the memory
         assertEquals(cache.asMap().size(), 1);
     }
-
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java
index bf91dab14fe..2f0c8b627d5 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLHandshakeException;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.config.RegistryBuilder;
@@ -47,6 +48,7 @@ public class JettySslContextFactoryTest {
 
     @Test
     public void testJettyTlsServerTls() throws Exception {
+        @Cleanup("stop")
         Server server = new Server();
         List<ServerConnector> connectors = new ArrayList<>();
         SslContextFactory factory = 
JettySslContextFactory.createServerSslContext(
@@ -72,15 +74,15 @@ public class JettySslContextFactoryTest {
                 new SSLConnectionSocketFactory(getClientSslContext(), new 
NoopHostnameVerifier()));
         PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager(registryBuilder.build());
         httpClientBuilder.setConnectionManager(cm);
+        @Cleanup
         CloseableHttpClient httpClient = httpClientBuilder.build();
         HttpGet httpGet = new HttpGet("https://localhost:"; + 
connector.getLocalPort());
         httpClient.execute(httpGet);
-        httpClient.close();
-        server.stop();
     }
 
     @Test(expectedExceptions = SSLHandshakeException.class)
     public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
+        @Cleanup("stop")
         Server server = new Server();
         List<ServerConnector> connectors = new ArrayList<>();
         SslContextFactory factory = 
JettySslContextFactory.createServerSslContext(
@@ -110,15 +112,15 @@ public class JettySslContextFactoryTest {
                 new String[]{"TLSv1.2"}, null, new NoopHostnameVerifier()));
         PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager(registryBuilder.build());
         httpClientBuilder.setConnectionManager(cm);
+        @Cleanup
         CloseableHttpClient httpClient = httpClientBuilder.build();
         HttpGet httpGet = new HttpGet("https://localhost:"; + 
connector.getLocalPort());
         httpClient.execute(httpGet);
-        httpClient.close();
-        server.stop();
     }
 
     @Test(expectedExceptions = SSLHandshakeException.class)
     public void testJettyTlsServerInvalidCipher() throws Exception {
+        @Cleanup("stop")
         Server server = new Server();
         List<ServerConnector> connectors = new ArrayList<>();
         SslContextFactory factory = 
JettySslContextFactory.createServerSslContext(
@@ -154,11 +156,10 @@ public class JettySslContextFactoryTest {
                 new NoopHostnameVerifier()));
         PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager(registryBuilder.build());
         httpClientBuilder.setConnectionManager(cm);
+        @Cleanup
         CloseableHttpClient httpClient = httpClientBuilder.build();
         HttpGet httpGet = new HttpGet("https://localhost:"; + 
connector.getLocalPort());
         httpClient.execute(httpGet);
-        httpClient.close();
-        server.stop();
     }
 
     private static SSLContext getClientSslContext() throws 
GeneralSecurityException, IOException {
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java
index 1d41cd36841..f08f62c480c 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/jetty/tls/JettySslContextFactoryWithKeyStoreTest.java
@@ -30,6 +30,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLHandshakeException;
 import javax.net.ssl.TrustManagerFactory;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.config.RegistryBuilder;
@@ -62,6 +63,7 @@ public class JettySslContextFactoryWithKeyStoreTest {
 
     @Test
     public void testJettyTlsServerTls() throws Exception {
+        @Cleanup("stop")
         Server server = new Server();
         List<ServerConnector> connectors = new ArrayList<>();
         SslContextFactory.Server factory = 
JettySslContextFactory.createServerSslContextWithKeystore(null,
@@ -81,16 +83,16 @@ public class JettySslContextFactoryWithKeyStoreTest {
                 new SSLConnectionSocketFactory(getClientSslContext(), new 
NoopHostnameVerifier()));
         PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager(registryBuilder.build());
         httpClientBuilder.setConnectionManager(cm);
+        @Cleanup
         CloseableHttpClient httpClient = httpClientBuilder.build();
         HttpGet httpGet = new HttpGet("https://localhost:"; + 
connector.getLocalPort());
         httpClient.execute(httpGet);
-        httpClient.close();
-        server.stop();
     }
 
     @Test(expectedExceptions = SSLHandshakeException.class)
     public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
         Configurator.setRootLevel(Level.INFO);
+        @Cleanup("stop")
         Server server = new Server();
         List<ServerConnector> connectors = new ArrayList<>();
         SslContextFactory.Server factory = 
JettySslContextFactory.createServerSslContextWithKeystore(null,
@@ -114,15 +116,15 @@ public class JettySslContextFactoryWithKeyStoreTest {
                 new String[]{"TLSv1.2"}, null, new NoopHostnameVerifier()));
         PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager(registryBuilder.build());
         httpClientBuilder.setConnectionManager(cm);
+        @Cleanup
         CloseableHttpClient httpClient = httpClientBuilder.build();
         HttpGet httpGet = new HttpGet("https://localhost:"; + 
connector.getLocalPort());
         httpClient.execute(httpGet);
-        httpClient.close();
-        server.stop();
     }
 
     @Test(expectedExceptions = SSLHandshakeException.class)
     public void testJettyTlsServerInvalidCipher() throws Exception {
+        @Cleanup("stop")
         Server server = new Server();
         List<ServerConnector> connectors = new ArrayList<>();
         SslContextFactory.Server factory = 
JettySslContextFactory.createServerSslContextWithKeystore(null,
@@ -151,11 +153,10 @@ public class JettySslContextFactoryWithKeyStoreTest {
                 new NoopHostnameVerifier()));
         PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager(registryBuilder.build());
         httpClientBuilder.setConnectionManager(cm);
+        @Cleanup
         CloseableHttpClient httpClient = httpClientBuilder.build();
         HttpGet httpGet = new HttpGet("https://localhost:"; + 
connector.getLocalPort());
         httpClient.execute(httpGet);
-        httpClient.close();
-        server.stop();
     }
 
     private static SSLContext getClientSslContext() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index a2d80b2ba60..c66eff2c8a1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -768,6 +768,7 @@ public class BatchMessageTest extends BrokerTestBase {
         final Consumer<byte[]> myConsumer = 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscribe();
         // assertEquals(dispatcher.getTotalUnackedMessages(), 1);
+        @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newFixedThreadPool(10);
 
         final CountDownLatch latch = new CountDownLatch(numMsgs);
diff --git 
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
 
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index f51fb766f03..f7ec9b964c6 100644
--- 
a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ 
b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -165,6 +165,12 @@ public class AuthenticationSasl implements Authentication, 
EncodedAuthentication
     public void close() throws IOException {
         if (client != null) {
             client.close();
+            client = null;
+        }
+        if (jaasCredentialsContainer != null) {
+            jaasCredentialsContainer.close();
+            jaasCredentialsContainer = null;
+            initializedJAAS = false;
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
index aa4c8de4a9c..6a0a7448f75 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
@@ -97,7 +97,7 @@ public class TGTRefreshThread extends Thread {
     @Override
     public void run() {
         log.info("TGT refresh thread started.");
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             // renewal thread's main loop. if it exits from here, thread will 
exit.
             KerberosTicket tgt = getTGT();
             long now = System.currentTimeMillis();
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 134e77a3b58..c83648132d4 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -60,10 +62,14 @@ import org.apache.pulsar.io.core.SourceContext;
 import org.awaitility.Awaitility;
 import org.jetbrains.annotations.NotNull;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 public class JavaInstanceRunnableTest {
+    private final List<AutoCloseable> closeables = new ArrayList<>();
 
     static class IntegerSerDe implements SerDe<Integer> {
         @Override
@@ -113,8 +119,10 @@ public class JavaInstanceRunnableTest {
                 .build();
         InstanceConfig config = createInstanceConfig(functionDetails);
         config.setClusterName("test-cluster");
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build();
+        registerCloseable(pulsarClient);
         return new JavaInstanceRunnable(config, clientBuilder,
-                
PulsarClient.builder().serviceUrl("pulsar://test-cluster:6650").build(), null, 
null, null, null, null,
+                pulsarClient, null, null, null, null, null,
                 Thread.currentThread().getContextClassLoader(), null);
     }
 
@@ -493,4 +501,29 @@ public class JavaInstanceRunnableTest {
                     Assert.assertFalse((boolean) 
getPrivateField(javaInstanceRunnable, "isInitialized"));
                 });
     }
+
+    @AfterClass
+    public void cleanupInstanceCache() {
+        InstanceCache.shutdown();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void cleanupCloseables() {
+        callCloseables(closeables);
+    }
+
+    protected <T extends AutoCloseable> T registerCloseable(T closeable) {
+        closeables.add(closeable);
+        return closeable;
+    }
+
+    private static void callCloseables(List<AutoCloseable> closeables) {
+        for (int i = closeables.size() - 1; i >= 0; i--) {
+            try {
+                closeables.get(i).close();
+            } catch (Exception e) {
+                log.error("Failure in calling close method", e);
+            }
+        }
+    }
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
index ce3109b852e..162c9ad51ce 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WaterMarkEventGeneratorTest.java
@@ -63,7 +63,7 @@ public class WaterMarkEventGeneratorTest {
 
     @AfterMethod(alwaysRun = true)
     public void tearDown() {
-//        waterMarkEventGenerator.shutdown();
+        waterMarkEventGenerator.shutdown();
         eventList.clear();
     }
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index bc56b1766d3..c332b5e6461 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -45,6 +45,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.Functions;
@@ -113,6 +114,7 @@ public class FunctionRuntimeManagerTest {
             mockRuntimeFactory(runtimeFactoryMockedStatic);
 
             // test new assignment add functions
+            @Cleanup
             FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                     workerConfig,
                     workerService,
@@ -205,6 +207,7 @@ public class FunctionRuntimeManagerTest {
 
 
             // test new assignment delete functions
+            @Cleanup
             FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                     workerConfig,
                     workerService,
@@ -305,6 +308,7 @@ public class FunctionRuntimeManagerTest {
                 .mockStatic(RuntimeFactory.class);) {
             mockRuntimeFactory(runtimeFactoryMockedStatic);
             // test new assignment update functions
+            @Cleanup
             FunctionRuntimeManager functionRuntimeManager = new 
FunctionRuntimeManager(
                     workerConfig,
                     workerService,
@@ -444,6 +448,7 @@ public class FunctionRuntimeManagerTest {
             mockRuntimeFactory(runtimeFactoryMockedStatic);
 
             // test new assignment update functions
+            @Cleanup
             FunctionRuntimeManager functionRuntimeManager = new 
FunctionRuntimeManager(
                     workerConfig,
                     workerService,
@@ -638,6 +643,7 @@ public class FunctionRuntimeManagerTest {
             mockRuntimeFactory(runtimeFactoryMockedStatic);
 
             // test new assignment add functions
+            @Cleanup
             FunctionRuntimeManager functionRuntimeManager = new 
FunctionRuntimeManager(
                     workerConfig,
                     workerService,
@@ -729,6 +735,7 @@ public class FunctionRuntimeManagerTest {
 
 
             // test new assignment update functions
+            @Cleanup
             FunctionRuntimeManager functionRuntimeManager = new 
FunctionRuntimeManager(
                     workerConfig,
                     workerService,
@@ -856,7 +863,6 @@ public class FunctionRuntimeManagerTest {
                     mock(FunctionMetaDataManager.class),
                     mock(WorkerStatsManager.class),
                     mock(ErrorNotifier.class));
-
             fail();
         } catch (Exception e) {
             assertEquals(e.getMessage(), "A Function Runtime Factory needs to 
be set");
@@ -933,6 +939,7 @@ public class FunctionRuntimeManagerTest {
                 mockRuntimeFactory(runtimeFactoryMockedStatic);
 
 
+                @Cleanup
                 FunctionRuntimeManager functionRuntimeManager = new 
FunctionRuntimeManager(
                         workerConfig,
                         mock(PulsarWorkerService.class),
@@ -984,6 +991,7 @@ public class FunctionRuntimeManagerTest {
 
                 })) {
 
+            @Cleanup
             FunctionRuntimeManager functionRuntimeManager = new 
FunctionRuntimeManager(
                     workerConfig,
                     mock(PulsarWorkerService.class),
@@ -1013,6 +1021,7 @@ public class FunctionRuntimeManagerTest {
             workerConfig = new WorkerConfig();
             workerConfig.setProcessContainerFactory(processContainerFactory);
 
+            functionRuntimeManager.close();
             functionRuntimeManager = new FunctionRuntimeManager(
                     workerConfig,
                     mock(PulsarWorkerService.class),
@@ -1041,6 +1050,7 @@ public class FunctionRuntimeManagerTest {
             workerConfig.setThreadContainerFactory(threadContainerFactory);
             workerConfig.setPulsarServiceUrl(PULSAR_SERVICE_URL);
 
+            functionRuntimeManager.close();
             functionRuntimeManager = new FunctionRuntimeManager(
                     workerConfig,
                     mock(PulsarWorkerService.class),
@@ -1112,6 +1122,7 @@ public class FunctionRuntimeManagerTest {
                             
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
                             
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
 
+            @Cleanup
             FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                     workerConfig,
                     workerService,
@@ -1201,6 +1212,7 @@ public class FunctionRuntimeManagerTest {
                                 
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
                                 
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();
 
+                @Cleanup
                 FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                         workerConfig,
                         workerService,
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index ac3176b3135..66b831f8788 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -32,6 +32,7 @@ import static org.testng.Assert.assertNotNull;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import lombok.Cleanup;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -106,6 +107,7 @@ public class MembershipManagerTest {
         doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
+        @Cleanup
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                 workerConfig,
                 workerService,
@@ -181,6 +183,7 @@ public class MembershipManagerTest {
         doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
+        @Cleanup
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                 workerConfig,
                 workerService,
@@ -273,6 +276,7 @@ public class MembershipManagerTest {
         doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
+        @Cleanup
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                 workerConfig,
                 workerService,
@@ -357,6 +361,7 @@ public class MembershipManagerTest {
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
+        @Cleanup
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                 workerConfig,
                 workerService,
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 01d34d75945..6a8d15814f3 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -148,6 +148,7 @@ public class SchedulerManagerTest {
 
     @AfterMethod(alwaysRun = true)
     public void stop() {
+        schedulerManager.close();
         this.executor.shutdownNow();
     }
 
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 4dcbcda3d90..5660b3518f1 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -363,6 +363,7 @@ public class PulsarRegistrationClientTest extends 
BaseMetadataStoreTest {
         final String zksConnectionString = zks.getConnectionString();
         final String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
         // prepare registration manager
+        @Cleanup
         ZooKeeper zk = new ZooKeeper(zksConnectionString, 5000, null);
         final ServerConfiguration serverConfiguration = new 
ServerConfiguration();
         serverConfiguration.setZkLedgersRootPath(ledgersRoot);
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 3b422452d64..84aaba5fab6 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -227,27 +227,27 @@ public class PerformanceTransaction {
                 .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
                         .enableTransaction(!arguments.isDisableTransaction);
 
-        PulsarClient client = clientBuilder.build();
+        try (PulsarClient client = clientBuilder.build()) {
 
-        ExecutorService executorService = new 
ThreadPoolExecutor(arguments.numTestThreads,
-                arguments.numTestThreads,
-                0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<>());
+            ExecutorService executorService = new 
ThreadPoolExecutor(arguments.numTestThreads,
+                    arguments.numTestThreads,
+                    0L, TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<>());
 
 
-        long startTime = System.nanoTime();
-        long testEndTime = startTime + (long) (arguments.testTime * 1e9);
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            if (!arguments.isDisableTransaction) {
-                printTxnAggregatedThroughput(startTime);
-            } else {
-                printAggregatedThroughput(startTime);
-            }
-            printAggregatedStats();
-        }));
+            long startTime = System.nanoTime();
+            long testEndTime = startTime + (long) (arguments.testTime * 1e9);
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                if (!arguments.isDisableTransaction) {
+                    printTxnAggregatedThroughput(startTime);
+                } else {
+                    printAggregatedThroughput(startTime);
+                }
+                printAggregatedStats();
+            }));
 
-        // start perf test
-        AtomicBoolean executing = new AtomicBoolean(true);
+            // start perf test
+            AtomicBoolean executing = new AtomicBoolean(true);
 
             RateLimiter rateLimiter = arguments.openTxnRate > 0
                     ? RateLimiter.create(arguments.openTxnRate)
@@ -304,97 +304,98 @@ public class PerformanceTransaction {
                         }
                         Transaction transaction = atomicReference.get();
                         for (List<Consumer<byte[]>> subscriptions : consumers) 
{
-                                for (Consumer<byte[]> consumer : 
subscriptions) {
-                                    for (int j = 0; j < 
arguments.numMessagesReceivedPerTransaction; j++) {
-                                        Message<byte[]> message = null;
-                                        try {
-                                            message = consumer.receive();
-                                        } catch (PulsarClientException e) {
-                                            log.error("Receive message 
failed", e);
-                                            executorService.shutdownNow();
-                                            PerfClientUtils.exit(1);
-                                        }
-                                        long receiveTime = System.nanoTime();
-                                        if (!arguments.isDisableTransaction) {
-                                            
consumer.acknowledgeAsync(message.getMessageId(), transaction)
-                                                    .thenRun(() -> {
-                                                        long latencyMicros = 
NANOSECONDS.toMicros(
-                                                                
System.nanoTime() - receiveTime);
-                                                        
messageAckRecorder.recordValue(latencyMicros);
-                                                        
messageAckCumulativeRecorder.recordValue(latencyMicros);
-                                                        
numMessagesAckSuccess.increment();
-                                                    }).exceptionally(exception 
-> {
-                                                if (exception instanceof 
InterruptedException && !executing.get()) {
-                                                    return null;
-                                                }
-                                                log.error(
-                                                        "Ack message failed 
with transaction {} throw exception",
-                                                        transaction, 
exception);
-                                                
numMessagesAckFailed.increment();
-                                                return null;
-                                            });
-                                        } else {
-                                            
consumer.acknowledgeAsync(message).thenRun(() -> {
-                                                long latencyMicros = 
NANOSECONDS.toMicros(
-                                                        System.nanoTime() - 
receiveTime);
-                                                
messageAckRecorder.recordValue(latencyMicros);
-                                                
messageAckCumulativeRecorder.recordValue(latencyMicros);
-                                                
numMessagesAckSuccess.increment();
-                                            }).exceptionally(exception -> {
-                                                if (exception instanceof 
InterruptedException && !executing.get()) {
+                            for (Consumer<byte[]> consumer : subscriptions) {
+                                for (int j = 0; j < 
arguments.numMessagesReceivedPerTransaction; j++) {
+                                    Message<byte[]> message = null;
+                                    try {
+                                        message = consumer.receive();
+                                    } catch (PulsarClientException e) {
+                                        log.error("Receive message failed", e);
+                                        executorService.shutdownNow();
+                                        PerfClientUtils.exit(1);
+                                    }
+                                    long receiveTime = System.nanoTime();
+                                    if (!arguments.isDisableTransaction) {
+                                        
consumer.acknowledgeAsync(message.getMessageId(), transaction)
+                                                .thenRun(() -> {
+                                                    long latencyMicros = 
NANOSECONDS.toMicros(
+                                                            System.nanoTime() 
- receiveTime);
+                                                    
messageAckRecorder.recordValue(latencyMicros);
+                                                    
messageAckCumulativeRecorder.recordValue(latencyMicros);
+                                                    
numMessagesAckSuccess.increment();
+                                                }).exceptionally(exception -> {
+                                                    if (exception instanceof 
InterruptedException && !executing.get()) {
+                                                        return null;
+                                                    }
+                                                    log.error(
+                                                            "Ack message 
failed with transaction {} throw exception",
+                                                            transaction, 
exception);
+                                                    
numMessagesAckFailed.increment();
                                                     return null;
-                                                }
-                                                log.error(
-                                                        "Ack message failed 
with transaction {} throw exception",
-                                                        transaction, 
exception);
-                                                
numMessagesAckFailed.increment();
+                                                });
+                                    } else {
+                                        
consumer.acknowledgeAsync(message).thenRun(() -> {
+                                            long latencyMicros = 
NANOSECONDS.toMicros(
+                                                    System.nanoTime() - 
receiveTime);
+                                            
messageAckRecorder.recordValue(latencyMicros);
+                                            
messageAckCumulativeRecorder.recordValue(latencyMicros);
+                                            numMessagesAckSuccess.increment();
+                                        }).exceptionally(exception -> {
+                                            if (exception instanceof 
InterruptedException && !executing.get()) {
                                                 return null;
-                                            });
-                                        }
+                                            }
+                                            log.error(
+                                                    "Ack message failed with 
transaction {} throw exception",
+                                                    transaction, exception);
+                                            numMessagesAckFailed.increment();
+                                            return null;
+                                        });
+                                    }
                                 }
                             }
                         }
 
-                        for (Producer<byte[]> producer : producers){
+                        for (Producer<byte[]> producer : producers) {
                             for (int j = 0; j < 
arguments.numMessagesProducedPerTransaction; j++) {
                                 long sendTime = System.nanoTime();
                                 if (!arguments.isDisableTransaction) {
                                     
producer.newMessage(transaction).value(payloadBytes)
                                             .sendAsync().thenRun(() -> {
-                                        long latencyMicros = 
NANOSECONDS.toMicros(
-                                                System.nanoTime() - sendTime);
-                                        
messageSendRecorder.recordValue(latencyMicros);
-                                        
messageSendRCumulativeRecorder.recordValue(latencyMicros);
-                                        numMessagesSendSuccess.increment();
-                                    }).exceptionally(exception -> {
-                                        if (exception instanceof 
InterruptedException && !executing.get()) {
-                                            return null;
-                                        }
-                                        log.error("Send transaction message 
failed with exception : ", exception);
-                                        numMessagesSendFailed.increment();
-                                        return null;
-                                    });
+                                                long latencyMicros = 
NANOSECONDS.toMicros(
+                                                        System.nanoTime() - 
sendTime);
+                                                
messageSendRecorder.recordValue(latencyMicros);
+                                                
messageSendRCumulativeRecorder.recordValue(latencyMicros);
+                                                
numMessagesSendSuccess.increment();
+                                            }).exceptionally(exception -> {
+                                                if (exception instanceof 
InterruptedException && !executing.get()) {
+                                                    return null;
+                                                }
+                                                log.error("Send transaction 
message failed with exception : ",
+                                                        exception);
+                                                
numMessagesSendFailed.increment();
+                                                return null;
+                                            });
                                 } else {
                                     producer.newMessage().value(payloadBytes)
                                             .sendAsync().thenRun(() -> {
-                                        long latencyMicros = 
NANOSECONDS.toMicros(
-                                                System.nanoTime() - sendTime);
-                                        
messageSendRecorder.recordValue(latencyMicros);
-                                        
messageSendRCumulativeRecorder.recordValue(latencyMicros);
-                                        numMessagesSendSuccess.increment();
-                                    }).exceptionally(exception -> {
-                                        if (exception instanceof 
InterruptedException && !executing.get()) {
-                                            return null;
-                                        }
-                                        log.error("Send message failed with 
exception : ", exception);
-                                        numMessagesSendFailed.increment();
-                                        return null;
-                                    });
+                                                long latencyMicros = 
NANOSECONDS.toMicros(
+                                                        System.nanoTime() - 
sendTime);
+                                                
messageSendRecorder.recordValue(latencyMicros);
+                                                
messageSendRCumulativeRecorder.recordValue(latencyMicros);
+                                                
numMessagesSendSuccess.increment();
+                                            }).exceptionally(exception -> {
+                                                if (exception instanceof 
InterruptedException && !executing.get()) {
+                                                    return null;
+                                                }
+                                                log.error("Send message failed 
with exception : ", exception);
+                                                
numMessagesSendFailed.increment();
+                                                return null;
+                                            });
                                 }
                             }
                         }
 
-                        if (rateLimiter != null){
+                        if (rateLimiter != null) {
                             rateLimiter.tryAcquire();
                         }
                         if (!arguments.isDisableTransaction) {
@@ -437,13 +438,13 @@ public class PerformanceTransaction {
                                     atomicReference.compareAndSet(transaction, 
newTransaction);
                                     totalNumTxnOpenTxnSuccess.increment();
                                     break;
-                                    } catch (Exception throwable){
-                                        if (throwable instanceof 
InterruptedException && !executing.get()) {
-                                            break;
-                                        }
-                                        log.error("Failed to new transaction 
with exception: ", throwable);
-                                        totalNumTxnOpenTxnFail.increment();
+                                } catch (Exception throwable) {
+                                    if (throwable instanceof 
InterruptedException && !executing.get()) {
+                                        break;
                                     }
+                                    log.error("Failed to new transaction with 
exception: ", throwable);
+                                    totalNumTxnOpenTxnFail.increment();
+                                }
                             }
                         } else {
                             totalNumTxnOpenTxnSuccess.increment();
@@ -455,68 +456,68 @@ public class PerformanceTransaction {
             }
 
 
+            // Print report stats
+            long oldTime = System.nanoTime();
 
-        // Print report stats
-        long oldTime = System.nanoTime();
-
-        Histogram reportSendHistogram = null;
-        Histogram reportAckHistogram = null;
+            Histogram reportSendHistogram = null;
+            Histogram reportAckHistogram = null;
 
-        String statsFileName = "perf-transaction-" + 
System.currentTimeMillis() + ".hgrm";
-        log.info("Dumping latency stats to {}", statsFileName);
+            String statsFileName = "perf-transaction-" + 
System.currentTimeMillis() + ".hgrm";
+            log.info("Dumping latency stats to {}", statsFileName);
 
-        PrintStream histogramLog = new PrintStream(new 
FileOutputStream(statsFileName), false);
-        HistogramLogWriter histogramLogWriter = new 
HistogramLogWriter(histogramLog);
+            PrintStream histogramLog = new PrintStream(new 
FileOutputStream(statsFileName), false);
+            HistogramLogWriter histogramLogWriter = new 
HistogramLogWriter(histogramLog);
 
-        // Some log header bits
-        histogramLogWriter.outputLogFormatVersion();
-        histogramLogWriter.outputLegend();
+            // Some log header bits
+            histogramLogWriter.outputLogFormatVersion();
+            histogramLogWriter.outputLegend();
 
-        while (executing.get()) {
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException e) {
-                break;
+            while (executing.get()) {
+                try {
+                    Thread.sleep(10000);
+                } catch (InterruptedException e) {
+                    break;
+                }
+                long now = System.nanoTime();
+                double elapsed = (now - oldTime) / 1e9;
+                long total = totalNumEndTxnOpFailed.sum() + 
totalNumTxnOpenTxnSuccess.sum();
+                double rate = numTxnOpSuccess.sumThenReset() / elapsed;
+                reportSendHistogram = 
messageSendRecorder.getIntervalHistogram(reportSendHistogram);
+                reportAckHistogram = 
messageAckRecorder.getIntervalHistogram(reportAckHistogram);
+                String txnOrTaskLog = !arguments.isDisableTransaction
+                        ? "Throughput transaction: {} transaction executes --- 
{} transaction/s"
+                        : "Throughput task: {} task executes --- {} task/s";
+                log.info(
+                        txnOrTaskLog + "  --- send Latency: mean: {} ms - med: 
{} "
+                                + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 
99.99pct: {} - Max: {}"
+                                + " --- ack Latency: "
+                                + "mean: {} ms - med: {} - 95pct: {} - 99pct: 
{} - 99.9pct: {} - 99.99pct: {} - Max: "
+                                + "{}",
+                        INTFORMAT.format(total),
+                        DEC.format(rate),
+                        DEC.format(reportSendHistogram.getMean() / 1000.0),
+                        
DEC.format(reportSendHistogram.getValueAtPercentile(50) / 1000.0),
+                        
DEC.format(reportSendHistogram.getValueAtPercentile(95) / 1000.0),
+                        
DEC.format(reportSendHistogram.getValueAtPercentile(99) / 1000.0),
+                        
DEC.format(reportSendHistogram.getValueAtPercentile(99.9) / 1000.0),
+                        
DEC.format(reportSendHistogram.getValueAtPercentile(99.99) / 1000.0),
+                        DEC.format(reportSendHistogram.getMaxValue() / 1000.0),
+                        DEC.format(reportAckHistogram.getMean() / 1000.0),
+                        DEC.format(reportAckHistogram.getValueAtPercentile(50) 
/ 1000.0),
+                        DEC.format(reportAckHistogram.getValueAtPercentile(95) 
/ 1000.0),
+                        DEC.format(reportAckHistogram.getValueAtPercentile(99) 
/ 1000.0),
+                        
DEC.format(reportAckHistogram.getValueAtPercentile(99.9) / 1000.0),
+                        
DEC.format(reportAckHistogram.getValueAtPercentile(99.99) / 1000.0),
+                        DEC.format(reportAckHistogram.getMaxValue() / 1000.0));
+
+                
histogramLogWriter.outputIntervalHistogram(reportSendHistogram);
+                histogramLogWriter.outputIntervalHistogram(reportAckHistogram);
+                reportSendHistogram.reset();
+                reportAckHistogram.reset();
+
+                oldTime = now;
             }
-            long now = System.nanoTime();
-            double elapsed = (now - oldTime) / 1e9;
-            long total = totalNumEndTxnOpFailed.sum() + 
totalNumTxnOpenTxnSuccess.sum();
-            double rate = numTxnOpSuccess.sumThenReset() / elapsed;
-            reportSendHistogram = 
messageSendRecorder.getIntervalHistogram(reportSendHistogram);
-            reportAckHistogram = 
messageAckRecorder.getIntervalHistogram(reportAckHistogram);
-            String txnOrTaskLog = !arguments.isDisableTransaction
-                    ? "Throughput transaction: {} transaction executes --- {} 
transaction/s"
-                    : "Throughput task: {} task executes --- {} task/s";
-            log.info(
-                    txnOrTaskLog + "  --- send Latency: mean: {} ms - med: {} "
-                            + "- 95pct: {} - 99pct: {} - 99.9pct: {} - 
99.99pct: {} - Max: {}" + " --- ack Latency: "
-                            + "mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 
99.9pct: {} - 99.99pct: {} - Max: {}",
-                    INTFORMAT.format(total),
-                    DEC.format(rate),
-                    DEC.format(reportSendHistogram.getMean() / 1000.0),
-                    DEC.format(reportSendHistogram.getValueAtPercentile(50) / 
1000.0),
-                    DEC.format(reportSendHistogram.getValueAtPercentile(95) / 
1000.0),
-                    DEC.format(reportSendHistogram.getValueAtPercentile(99) / 
1000.0),
-                    DEC.format(reportSendHistogram.getValueAtPercentile(99.9) 
/ 1000.0),
-                    DEC.format(reportSendHistogram.getValueAtPercentile(99.99) 
/ 1000.0),
-                    DEC.format(reportSendHistogram.getMaxValue() / 1000.0),
-                    DEC.format(reportAckHistogram.getMean() / 1000.0),
-                    DEC.format(reportAckHistogram.getValueAtPercentile(50) / 
1000.0),
-                    DEC.format(reportAckHistogram.getValueAtPercentile(95) / 
1000.0),
-                    DEC.format(reportAckHistogram.getValueAtPercentile(99) / 
1000.0),
-                    DEC.format(reportAckHistogram.getValueAtPercentile(99.9) / 
1000.0),
-                    DEC.format(reportAckHistogram.getValueAtPercentile(99.99) 
/ 1000.0),
-                    DEC.format(reportAckHistogram.getMaxValue() / 1000.0));
-
-            histogramLogWriter.outputIntervalHistogram(reportSendHistogram);
-            histogramLogWriter.outputIntervalHistogram(reportAckHistogram);
-            reportSendHistogram.reset();
-            reportAckHistogram.reset();
-
-            oldTime = now;
         }
-
-
     }
 
 
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index bf2fade1c67..20679d83676 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -169,9 +170,9 @@ public class PerformanceProducerTest extends 
MockedPulsarServiceBaseTest {
     @Test(timeOut = 20000)
     public void testBatchingDisabled() throws Exception {
         PerformanceProducer.Arguments arguments = new 
PerformanceProducer.Arguments();
-        
+
         int producerId = 0;
-        
+
         String topic = testTopic + UUID.randomUUID();
         arguments.topics = List.of(topic);
         arguments.msgRate = 10;
@@ -181,8 +182,9 @@ public class PerformanceProducerTest extends 
MockedPulsarServiceBaseTest {
 
         ClientBuilder clientBuilder = 
PerfClientUtils.createClientBuilderFromArguments(arguments)
                 .enableTransaction(arguments.isEnableTransaction);
+        @Cleanup
         PulsarClient client = clientBuilder.build();
-        
+
         ProducerBuilderImpl<byte[]> builder = (ProducerBuilderImpl<byte[]>) 
PerformanceProducer.createProducerBuilder(client, arguments, producerId);
         Assert.assertFalse(builder.getConf().isBatchingEnabled());
     }
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 3b831ad38ba..1e10db9753a 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -18,9 +18,20 @@
  */
 package org.apache.pulsar.transaction.coordinator;
 
+import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
+import static 
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -35,41 +46,34 @@ import org.apache.pulsar.common.util.FutureUtil;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import 
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
-import static 
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
-    private HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
-            1, TimeUnit.MILLISECONDS);
+    private HashedWheelTimer transactionTimer;
 
     public MLTransactionMetadataStoreTest() {
         super(3);
     }
 
+    @BeforeClass
+    public void initTimer() {
+        transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+    }
+
     @AfterClass
-    public void cleanup(){
+    public void cleanupTimer(){
         transactionTimer.stop();
     }
 
@@ -84,9 +88,11 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = 
new MLTransactionSequenceIdGenerator();
         
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+        @Cleanup("closeAsync")
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig, txnLogBufferedWriterConfig, 
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(),
@@ -172,9 +178,11 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = 
new MLTransactionSequenceIdGenerator();
         
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         managedLedgerConfig.setMaxEntriesPerLedger(3);
+        @Cleanup("closeAsync")
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig, disabledBufferedWriter, transactionTimer, 
DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), 
mlTransactionSequenceIdGenerator, 0L);
@@ -203,6 +211,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, 
factory,
                 managedLedgerConfig, disabledBufferedWriter, transactionTimer, 
DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+        transactionMetadataStore.closeAsync();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), 
mlTransactionSequenceIdGenerator, 0L);
@@ -229,10 +238,12 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         managedLedgerConfig.setMaxEntriesPerLedger(2);
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = 
new MLTransactionSequenceIdGenerator();
         
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+        @Cleanup("closeAsync")
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig, txnLogBufferedWriterConfig, 
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
 
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), 
mlTransactionSequenceIdGenerator, 0L);
@@ -284,6 +295,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                         DISABLED_BUFFERED_WRITER_METRICS);
                 txnLog2.initialize().get(2, TimeUnit.SECONDS);
 
+                @Cleanup("closeAsync")
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new 
MLTransactionMetadataStore(transactionCoordinatorID,
                                 txnLog2, new TransactionTimeoutTrackerImpl(), 
mlTransactionSequenceIdGenerator, 0L);
@@ -357,9 +369,11 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = 
new MLTransactionSequenceIdGenerator();
         
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+        @Cleanup("closeAsync")
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig, txnLogBufferedWriterConfig, 
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), 
mlTransactionSequenceIdGenerator, 0L);
@@ -435,9 +449,11 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = 
new MLTransactionSequenceIdGenerator();
         
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+        @Cleanup("closeAsync")
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig, txnLogBufferedWriterConfig, 
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), 
mlTransactionSequenceIdGenerator, 0L);
@@ -456,6 +472,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, 
factory,
                 managedLedgerConfig, txnLogBufferedWriterConfig, 
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+        transactionMetadataStore.closeAsync();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), 
mlTransactionSequenceIdGenerator, 0L);
@@ -475,9 +492,11 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = 
new MLTransactionSequenceIdGenerator();
         
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
+        @Cleanup("closeAsync")
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig, txnLogBufferedWriterConfig, 
transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
         mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+        @Cleanup("closeAsync")
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), 
mlTransactionSequenceIdGenerator, 0L);
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
index 8119c2f1f81..1ce858ec4a1 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Future;
 import javax.servlet.http.HttpServletRequest;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
@@ -56,12 +57,13 @@ import org.testng.annotations.Test;
  */
 public class PingPongSupportTest {
 
-    private static Server server;
+    private Server server;
 
-    private static final WebExecutorThreadPool executor = new 
WebExecutorThreadPool(6, "pulsar-websocket-web-test");
+    private WebExecutorThreadPool executor;
 
     @BeforeClass
-    public static void setup() throws Exception {
+    public void setup() throws Exception {
+        executor = new WebExecutorThreadPool(6, "pulsar-websocket-web-test");
         server = new Server(executor);
         List<ServerConnector> connectors = new ArrayList<>();
         ServerConnector connector = new ServerConnector(server);
@@ -90,7 +92,7 @@ public class PingPongSupportTest {
     }
 
     @AfterClass(alwaysRun = true)
-    public static void tearDown() throws Exception {
+    public void tearDown() throws Exception {
         if (server != null) {
             server.stop();
         }
@@ -108,6 +110,7 @@ public class PingPongSupportTest {
 
     @Test(dataProvider = "endpoint")
     public void testPingPong(String endpoint) throws Exception {
+        @Cleanup("stop")
         HttpClient httpClient = new HttpClient();
         WebSocketClient webSocketClient = new WebSocketClient(httpClient);
         webSocketClient.start();
diff --git 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 67e5cfd3560..56612adc1ef 100644
--- 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
 import static 
org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.Recycler;
 import java.io.IOException;
 import java.util.Iterator;
@@ -402,5 +403,8 @@ public class FileSystemManagedLedgerOffloader implements 
LedgerOffloader {
                 log.error("FileSystemManagedLedgerOffloader close failed!", e);
             }
         }
+        if (assignmentScheduler != null) {
+            MoreExecutors.shutdownAndAwaitTermination(assignmentScheduler, 5, 
TimeUnit.SECONDS);
+        }
     }
 }
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 477a03e2ca5..9609362e9b7 100644
--- 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.offload.filesystem;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Properties;
 import java.util.concurrent.Executors;
@@ -53,12 +54,15 @@ public abstract class FileStoreTestBase {
     }
 
     @AfterClass(alwaysRun = true)
-    public final void afterClass() {
+    public final void afterClass() throws IOException {
         cleanup();
     }
 
-    public void cleanup() {
-        scheduler.shutdownNow();
+    public void cleanup() throws IOException {
+        if (scheduler != null) {
+            scheduler.shutdownNow();
+            scheduler = null;
+        }
     }
 
     @BeforeMethod(alwaysRun = true)
@@ -66,6 +70,7 @@ public abstract class FileStoreTestBase {
         File baseDir = 
Files.createTempDirectory(basePath).toFile().getAbsoluteFile();
         Configuration conf = new Configuration();
         conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+        conf.set("dfs.namenode.gc.time.monitor.enable", "false");
         MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
         hdfsCluster = builder.build();
 
@@ -80,9 +85,18 @@ public abstract class FileStoreTestBase {
 
     @AfterMethod(alwaysRun = true)
     public void tearDown() {
-        hdfsCluster.shutdown(true, true);
-        hdfsCluster.close();
-        scheduledExecutorService.shutdownNow();
+        if (fileSystemManagedLedgerOffloader != null) {
+            fileSystemManagedLedgerOffloader.close();
+            fileSystemManagedLedgerOffloader = null;
+        }
+        if (hdfsCluster != null) {
+            hdfsCluster.shutdown(true, true);
+            hdfsCluster = null;
+        }
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+            scheduledExecutorService = null;
+        }
     }
 
     public String getURI() {
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index 7276be51217..71fe5ec7219 100644
--- 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -41,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -62,6 +64,14 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
         map.put("ManagedLedgerName", managedLedgerName);
     }
 
+    @Override
+    public void cleanup() throws IOException {
+        if (bk != null) {
+            bk.shutdown();
+        }
+        super.cleanup();
+    }
+
     private ReadHandle buildReadHandle() throws Exception {
 
         lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, 
"foobar".getBytes());
@@ -86,6 +96,12 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
         super.start();
     }
 
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void tearDown() {
+        super.tearDown();
+    }
+
     @Test
     public void testOffloadAndRead() throws Exception {
         LedgerOffloader offloader = fileSystemManagedLedgerOffloader;

Reply via email to