This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cad4e750296 [fix][test] Fix thread leaks in tests by closing executors 
properly (#21425)
cad4e750296 is described below

commit cad4e750296122b30e7beb8185e27232b21840b6
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Oct 24 07:27:36 2023 +0300

    [fix][test] Fix thread leaks in tests by closing executors properly (#21425)
---
 .../channel/ServiceUnitStateChannelTest.java       |  2 ++
 .../extensions/scheduler/UnloadSchedulerTest.java  |  6 +++-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  2 ++
 .../pulsar/broker/transaction/TransactionTest.java |  2 ++
 .../pulsar/client/api/ClientDeduplicationTest.java |  2 ++
 .../apache/pulsar/client/impl/ClientCnxTest.java   |  5 +--
 .../pulsar/client/impl/ConnectionHandlerTest.java  |  5 +--
 .../apache/pulsar/client/impl/RetryUtilTest.java   |  5 +--
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  2 +-
 ...roxyPublishConsumeClientSideEncryptionTest.java |  5 ++-
 .../apache/pulsar/common/util/FutureUtilTest.java  |  1 +
 .../pulsar/common/util/TrustManagerProxyTest.java  | 16 ++++-----
 .../offload/filesystem/FileStoreTestBase.java      | 38 +++++++++++++++++-----
 .../impl/FileSystemManagedLedgerOffloaderTest.java | 23 ++++++-------
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 38 ++++++++++++++--------
 15 files changed, 101 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 57d4537bdeb..31705264fba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -70,6 +70,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -234,6 +235,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var channel = createChannel(pulsar);
         int errorCnt = validateChannelStart(channel);
         assertEquals(6, errorCnt);
+        @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newSingleThreadExecutor();
         Future startFuture = executor.submit(() -> {
             try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
index 38d4e9904e6..1fd89ba882b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import com.google.common.collect.Lists;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
@@ -127,6 +128,8 @@ public class UnloadSchedulerTest {
         PulsarService pulsar = mock(PulsarService.class);
         NamespaceUnloadStrategy unloadStrategy = 
mock(NamespaceUnloadStrategy.class);
         
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newFixedThreadPool(1);
         doAnswer(__ -> CompletableFuture.supplyAsync(() -> {
                 try {
                     // Delay 5 seconds to finish.
@@ -135,9 +138,10 @@ public class UnloadSchedulerTest {
                     throw new RuntimeException(e);
                 }
                 return Lists.newArrayList("broker-1", "broker-2");
-            }, 
Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync();
+            }, executor)).when(registry).getAvailableBrokersAsync();
         UnloadScheduler scheduler = new UnloadScheduler(pulsar, 
loadManagerExecutor, unloadManager, context,
                 channel, unloadStrategy, counter, reference);
+        @Cleanup("shutdownNow")
         ExecutorService executorService = Executors.newFixedThreadPool(5);
         CountDownLatch latch = new CountDownLatch(5);
         for (int i = 0; i < 5; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 5b70ff99675..f0255a13cbd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -361,6 +362,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         TopicPolicies initPolicy = TopicPolicies.builder()
                 .maxConsumerPerTopic(10)
                 .build();
+        @Cleanup("shutdownNow")
         ScheduledExecutorService executors = 
Executors.newScheduledThreadPool(1);
         executors.schedule(new Runnable() {
             @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 905da9379ec..ee7a2e2d0b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -358,6 +358,7 @@ public class TransactionTest extends TransactionTestBase {
         int threadSize = 30;
         String topicName = "subscription";
         
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+        @Cleanup("shutdownNow")
         ExecutorService executorService = 
Executors.newFixedThreadPool(threadSize);
 
         //build producer/consumer
@@ -1451,6 +1452,7 @@ public class TransactionTest extends TransactionTestBase {
     public void testPendingAckReplayChangeStateError() throws 
InterruptedException, TimeoutException {
         AtomicInteger atomicInteger = new AtomicInteger(1);
         // Create Executor
+        @Cleanup("shutdownNow")
         ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
         // Mock serviceConfiguration.
         ServiceConfiguration serviceConfiguration = 
mock(ServiceConfiguration.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index d2f9617a5fa..4e96252056d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -382,6 +383,7 @@ public class ClientDeduplicationTest extends 
ProducerConsumerBase {
         int totalMessage = 200;
         int threadSize = 5;
         String topicName = "subscription";
+        @Cleanup("shutdownNow")
         ExecutorService executorService = 
Executors.newFixedThreadPool(threadSize);
         conf.setBrokerDeduplicationEnabled(true);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index d2f610ae53f..dfd52d494ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -43,7 +43,7 @@ public class ClientCnxTest extends 
MockedPulsarServiceBaseTest {
     public static final String TENANT = "tnx";
     public static final String NAMESPACE = TENANT + "/ns1";
     public static String persistentTopic = "persistent://" + NAMESPACE + 
"/test";
-    ExecutorService executorService = Executors.newFixedThreadPool(20);
+    ExecutorService executorService;
 
     @BeforeClass
     @Override
@@ -54,13 +54,14 @@ public class ClientCnxTest extends 
MockedPulsarServiceBaseTest {
         admin.tenants().createTenant(TENANT,
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace(NAMESPACE);
+        executorService = Executors.newFixedThreadPool(20);
     }
 
     @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
-        this.executorService.shutdown();
+        this.executorService.shutdownNow();
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
index f29d62db5f4..d61dc3442dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -47,20 +47,21 @@ public class ConnectionHandlerTest extends 
ProducerConsumerBase {
     private static final Backoff BACKOFF = new 
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
             .setMandatoryStop(1, TimeUnit.SECONDS)
             .setMax(3, TimeUnit.SECONDS).create();
-    private final ExecutorService executor = Executors.newFixedThreadPool(4);
+    private ExecutorService executor;
 
     @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
+        executor = Executors.newFixedThreadPool(4);
     }
 
     @AfterClass
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
-        executor.shutdown();
+        executor.shutdownNow();
     }
 
     @Test(timeOut = 30000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
index 604c468b1de..f7a0485a512 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import lombok.Cleanup;
 import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.Test;
@@ -37,6 +38,7 @@ public class RetryUtilTest {
 
     @Test
     public void testFailAndRetry() throws Exception {
+        @Cleanup("shutdownNow")
         ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
         CompletableFuture<Boolean> callback = new CompletableFuture<>();
         AtomicInteger atomicInteger = new AtomicInteger(0);
@@ -57,11 +59,11 @@ public class RetryUtilTest {
         }, backoff, executor, callback);
         assertTrue(callback.get());
         assertEquals(atomicInteger.get(), 5);
-        executor.shutdownNow();
     }
 
     @Test
     public void testFail() throws Exception {
+        @Cleanup("shutdownNow")
         ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
         CompletableFuture<Boolean> callback = new CompletableFuture<>();
         Backoff backoff = new BackoffBuilder()
@@ -79,6 +81,5 @@ public class RetryUtilTest {
         }
         long time = System.currentTimeMillis() - start;
         assertTrue(time >= 5000 - 2000, "Duration:" + time);
-        executor.shutdownNow();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 7eae6462545..d4ef041f6de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -1324,6 +1324,7 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
 
         final String topic = getTopicName(ns, "testCreateSchemaInParallel");
+        @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newFixedThreadPool(16);
         List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = new 
ArrayList<>(16);
         CountDownLatch latch = new CountDownLatch(16);
@@ -1365,7 +1366,6 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         });
         producers.clear();
         producers2.clear();
-        executor.shutdownNow();
     }
 
     @EqualsAndHashCode
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
index ad69df4757f..e36d9d2a194 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
@@ -58,7 +58,7 @@ import org.testng.annotations.Test;
 @Test(groups = "websocket")
 public class ProxyPublishConsumeClientSideEncryptionTest extends 
ProducerConsumerBase {
     private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
-    private static final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(1);
+    private ScheduledExecutorService executor;
     private static final Charset charset = Charset.defaultCharset();
 
     private ProxyServer proxyServer;
@@ -66,6 +66,8 @@ public class ProxyPublishConsumeClientSideEncryptionTest 
extends ProducerConsume
 
     @BeforeClass
     public void setup() throws Exception {
+        executor = Executors.newScheduledThreadPool(1);
+
         
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
 
         super.internalSetup();
@@ -92,6 +94,7 @@ public class ProxyPublishConsumeClientSideEncryptionTest 
extends ProducerConsume
         if (proxyServer != null) {
             proxyServer.stop();
         }
+        executor.shutdownNow();
         log.info("Finished Cleaning Up Test setup");
     }
 
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
index 7d44c187d73..09ce9f9f137 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
@@ -183,6 +183,7 @@ public class FutureUtilTest {
 
     public void testSequencer() {
         int concurrentNum = 1000;
+        @Cleanup("shutdownNow")
         final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(concurrentNum);
         final FutureUtil.Sequencer<Void> sequencer = 
FutureUtil.Sequencer.create();
         // normal case -- allowExceptionBreakChain=false
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/TrustManagerProxyTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/TrustManagerProxyTest.java
index 8114f9b9356..ab31740bd5f 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/TrustManagerProxyTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/TrustManagerProxyTest.java
@@ -25,6 +25,7 @@ import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import lombok.Cleanup;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -41,15 +42,12 @@ public class TrustManagerProxyTest {
     public void testLoadCA(String path, int count) {
         String caPath = Resources.getResource(path).getPath();
 
+        @Cleanup("shutdownNow")
         ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor();
-        try {
-            TrustManagerProxy trustManagerProxy =
-                    new TrustManagerProxy(caPath, 120, scheduledExecutor);
-            X509Certificate[] x509Certificates = 
trustManagerProxy.getAcceptedIssuers();
-            assertNotNull(x509Certificates);
-            assertEquals(Arrays.stream(x509Certificates).count(), count);
-        } finally {
-            scheduledExecutor.shutdown();
-        }
+        TrustManagerProxy trustManagerProxy =
+                new TrustManagerProxy(caPath, 120, scheduledExecutor);
+        X509Certificate[] x509Certificates = 
trustManagerProxy.getAcceptedIssuers();
+        assertNotNull(x509Certificates);
+        assertEquals(Arrays.stream(x509Certificates).count(), count);
     }
 }
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 3e6cd8745dc..477a03e2ca5 100644
--- 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -18,28 +18,48 @@
  */
 package org.apache.bookkeeper.mledger.offload.filesystem;
 
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import 
org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-
 public abstract class FileStoreTestBase {
     protected FileSystemManagedLedgerOffloader 
fileSystemManagedLedgerOffloader;
-    protected OrderedScheduler scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+    protected OrderedScheduler scheduler;
     protected final String basePath = "pulsar";
     private MiniDFSCluster hdfsCluster;
     private String hdfsURI;
     protected LedgerOffloaderStats offloaderStats;
+    private ScheduledExecutorService scheduledExecutorService;
+
+    @BeforeClass(alwaysRun = true)
+    public final void beforeClass() throws Exception {
+        init();
+    }
+
+    public void init() throws Exception {
+        scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public final void afterClass() {
+        cleanup();
+    }
+
+    public void cleanup() {
+        scheduler.shutdownNow();
+    }
 
     @BeforeMethod(alwaysRun = true)
     public void start() throws Exception {
@@ -51,7 +71,8 @@ public abstract class FileStoreTestBase {
 
         hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/";
         Properties properties = new Properties();
-        this.offloaderStats = LedgerOffloaderStats.create(true, true, 
Executors.newScheduledThreadPool(1), 60);
+        scheduledExecutorService = Executors.newScheduledThreadPool(1);
+        this.offloaderStats = LedgerOffloaderStats.create(true, true, 
scheduledExecutorService, 60);
         fileSystemManagedLedgerOffloader = new 
FileSystemManagedLedgerOffloader(
                 OffloadPoliciesImpl.create(properties),
                 scheduler, hdfsURI, basePath, offloaderStats);
@@ -61,6 +82,7 @@ public abstract class FileStoreTestBase {
     public void tearDown() {
         hdfsCluster.shutdown(true, true);
         hdfsCluster.close();
+        scheduledExecutorService.shutdownNow();
     }
 
     public String getURI() {
diff --git 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index b9de5d1a49e..7276be51217 100644
--- 
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -19,6 +19,14 @@
 package org.apache.bookkeeper.mledger.offload.filesystem.impl;
 
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
@@ -35,18 +43,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
 
 public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
-    private final PulsarMockBookKeeper bk;
+    private PulsarMockBookKeeper bk;
     private String managedLedgerName = "public/default/persistent/testOffload";
     private String topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
     private String storagePath = createStoragePath(managedLedgerName);
@@ -55,7 +54,9 @@ public class FileSystemManagedLedgerOffloaderTest extends 
FileStoreTestBase {
     private final int numberOfEntries = 601;
     private  Map<String, String> map = new HashMap<>();
 
-    public FileSystemManagedLedgerOffloaderTest() throws Exception {
+    @Override
+    public void init() throws Exception {
+        super.init();
         this.bk = new PulsarMockBookKeeper(scheduler);
         this.toWrite = buildReadHandle();
         map.put("ManagedLedgerName", managedLedgerName);
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index ac87a8e4240..bb4cb286680 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -23,8 +23,8 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -45,8 +46,8 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
 import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.apache.pulsar.common.naming.TopicName;
@@ -56,12 +57,14 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
 public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerOffloaderBase {
 
     private static final Logger log = 
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class);
+    private final ScheduledExecutorService scheduledExecutorService;
     private TieredStorageConfiguration mockedConfig;
     private final LedgerOffloaderStats offloaderStats;
 
@@ -72,13 +75,20 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         assertNotNull(provider);
         provider.validate(config);
         blobStore = provider.getBlobStore(config);
-        this.offloaderStats = LedgerOffloaderStats.create(true, true, 
Executors.newScheduledThreadPool(1), 60);
+        scheduledExecutorService = Executors.newScheduledThreadPool(1);
+        this.offloaderStats = LedgerOffloaderStats.create(true, true, 
scheduledExecutorService, 60);
+    }
+
+    @AfterClass(alwaysRun = true)
+    protected void cleanupInstance() throws Exception {
+        offloaderStats.close();
+        scheduledExecutorService.shutdownNow();
     }
 
     private BlobStoreManagedLedgerOffloader getOffloader() throws IOException {
         return getOffloader(BUCKET);
     }
-    
+
     private BlobStoreManagedLedgerOffloader getOffloader(BlobStore 
mockedBlobStore) throws IOException {
         return getOffloader(BUCKET, mockedBlobStore);
     }
@@ -89,10 +99,10 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader.create(mockedConfig, new 
HashMap<String,String>(), scheduler, this.offloaderStats);
         return offloader;
     }
-    
+
     private BlobStoreManagedLedgerOffloader getOffloader(String bucket, 
BlobStore mockedBlobStore) throws IOException {
         mockedConfig = mock(TieredStorageConfiguration.class, 
delegatesTo(getConfiguration(bucket)));
-        Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore(); 
+        Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
         BlobStoreManagedLedgerOffloader offloader = 
BlobStoreManagedLedgerOffloader.create(mockedConfig, new 
HashMap<String,String>(), scheduler, this.offloaderStats);
         return offloader;
     }
@@ -209,9 +219,9 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         String failureString = "fail InitDataBlockUpload";
 
         // mock throw exception when initiateMultipartUpload
-        try {      
+        try {
             BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
-            
+
             Mockito
                 .doThrow(new RuntimeException(failureString))
                 .when(spiedBlobStore).initiateMultipartUpload(any(), any(), 
any());
@@ -235,7 +245,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
 
         // mock throw exception when uploadPart
         try {
-            
+
             BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
             Mockito
                 .doThrow(new RuntimeException(failureString))
@@ -269,7 +279,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
                 .when(spiedBlobStore).abortMultipartUpload(any());
 
             BlobStoreManagedLedgerOffloader offloader = 
getOffloader(spiedBlobStore);
-            offloader.offload(readHandle, uuid, new HashMap<>()).get();        
   
+            offloader.offload(readHandle, uuid, new HashMap<>()).get();
 
             Assert.fail("Should throw exception for when 
completeMultipartUpload");
         } catch (Exception e) {
@@ -288,7 +298,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         String failureString = "fail putObject";
 
         // mock throw exception when putObject
-        try {     
+        try {
             BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
             Mockito
                 .doThrow(new RuntimeException(failureString))
@@ -380,7 +390,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
     public void testDeleteOffloaded() throws Exception {
         ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
         UUID uuid = UUID.randomUUID();
-        
+
         BlobStoreManagedLedgerOffloader offloader = getOffloader();
 
         // verify object exist after offload
@@ -399,13 +409,13 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         String failureString = "fail deleteOffloaded";
         ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
         UUID uuid = UUID.randomUUID();
-        
+
         BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
 
         Mockito
             .doThrow(new RuntimeException(failureString))
             .when(spiedBlobStore).removeBlobs(any(), any());
-        
+
         BlobStoreManagedLedgerOffloader offloader = 
getOffloader(spiedBlobStore);
 
         try {

Reply via email to