This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cad4e750296 [fix][test] Fix thread leaks in tests by closing executors
properly (#21425)
cad4e750296 is described below
commit cad4e750296122b30e7beb8185e27232b21840b6
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Oct 24 07:27:36 2023 +0300
[fix][test] Fix thread leaks in tests by closing executors properly (#21425)
---
.../channel/ServiceUnitStateChannelTest.java | 2 ++
.../extensions/scheduler/UnloadSchedulerTest.java | 6 +++-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 2 ++
.../pulsar/broker/transaction/TransactionTest.java | 2 ++
.../pulsar/client/api/ClientDeduplicationTest.java | 2 ++
.../apache/pulsar/client/impl/ClientCnxTest.java | 5 +--
.../pulsar/client/impl/ConnectionHandlerTest.java | 5 +--
.../apache/pulsar/client/impl/RetryUtilTest.java | 5 +--
.../java/org/apache/pulsar/schema/SchemaTest.java | 2 +-
...roxyPublishConsumeClientSideEncryptionTest.java | 5 ++-
.../apache/pulsar/common/util/FutureUtilTest.java | 1 +
.../pulsar/common/util/TrustManagerProxyTest.java | 16 ++++-----
.../offload/filesystem/FileStoreTestBase.java | 38 +++++++++++++++++-----
.../impl/FileSystemManagedLedgerOffloaderTest.java | 23 ++++++-------
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 38 ++++++++++++++--------
15 files changed, 101 insertions(+), 51 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 57d4537bdeb..31705264fba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -70,6 +70,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarServerException;
@@ -234,6 +235,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var channel = createChannel(pulsar);
int errorCnt = validateChannelStart(channel);
assertEquals(6, errorCnt);
+ @Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
Future startFuture = executor.submit(() -> {
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
index 38d4e9904e6..1fd89ba882b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
+import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
@@ -127,6 +128,8 @@ public class UnloadSchedulerTest {
PulsarService pulsar = mock(PulsarService.class);
NamespaceUnloadStrategy unloadStrategy =
mock(NamespaceUnloadStrategy.class);
doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newFixedThreadPool(1);
doAnswer(__ -> CompletableFuture.supplyAsync(() -> {
try {
// Delay 5 seconds to finish.
@@ -135,9 +138,10 @@ public class UnloadSchedulerTest {
throw new RuntimeException(e);
}
return Lists.newArrayList("broker-1", "broker-2");
- },
Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync();
+ }, executor)).when(registry).getAvailableBrokersAsync();
UnloadScheduler scheduler = new UnloadScheduler(pulsar,
loadManagerExecutor, unloadManager, context,
channel, unloadStrategy, counter, reference);
+ @Cleanup("shutdownNow")
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch latch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 5b70ff99675..f0255a13cbd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -361,6 +362,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
TopicPolicies initPolicy = TopicPolicies.builder()
.maxConsumerPerTopic(10)
.build();
+ @Cleanup("shutdownNow")
ScheduledExecutorService executors =
Executors.newScheduledThreadPool(1);
executors.schedule(new Runnable() {
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 905da9379ec..ee7a2e2d0b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -358,6 +358,7 @@ public class TransactionTest extends TransactionTestBase {
int threadSize = 30;
String topicName = "subscription";
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+ @Cleanup("shutdownNow")
ExecutorService executorService =
Executors.newFixedThreadPool(threadSize);
//build producer/consumer
@@ -1451,6 +1452,7 @@ public class TransactionTest extends TransactionTestBase {
public void testPendingAckReplayChangeStateError() throws
InterruptedException, TimeoutException {
AtomicInteger atomicInteger = new AtomicInteger(1);
// Create Executor
+ @Cleanup("shutdownNow")
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
// Mock serviceConfiguration.
ServiceConfiguration serviceConfiguration =
mock(ServiceConfiguration.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
index d2f9617a5fa..4e96252056d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -382,6 +383,7 @@ public class ClientDeduplicationTest extends
ProducerConsumerBase {
int totalMessage = 200;
int threadSize = 5;
String topicName = "subscription";
+ @Cleanup("shutdownNow")
ExecutorService executorService =
Executors.newFixedThreadPool(threadSize);
conf.setBrokerDeduplicationEnabled(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index d2f610ae53f..dfd52d494ae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -43,7 +43,7 @@ public class ClientCnxTest extends
MockedPulsarServiceBaseTest {
public static final String TENANT = "tnx";
public static final String NAMESPACE = TENANT + "/ns1";
public static String persistentTopic = "persistent://" + NAMESPACE +
"/test";
- ExecutorService executorService = Executors.newFixedThreadPool(20);
+ ExecutorService executorService;
@BeforeClass
@Override
@@ -54,13 +54,14 @@ public class ClientCnxTest extends
MockedPulsarServiceBaseTest {
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE);
+ executorService = Executors.newFixedThreadPool(20);
}
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
- this.executorService.shutdown();
+ this.executorService.shutdownNow();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
index f29d62db5f4..d61dc3442dc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -47,20 +47,21 @@ public class ConnectionHandlerTest extends
ProducerConsumerBase {
private static final Backoff BACKOFF = new
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
.setMandatoryStop(1, TimeUnit.SECONDS)
.setMax(3, TimeUnit.SECONDS).create();
- private final ExecutorService executor = Executors.newFixedThreadPool(4);
+ private ExecutorService executor;
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
+ executor = Executors.newFixedThreadPool(4);
}
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
- executor.shutdown();
+ executor.shutdownNow();
}
@Test(timeOut = 30000)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
index 604c468b1de..f7a0485a512 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import lombok.Cleanup;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;
@@ -37,6 +38,7 @@ public class RetryUtilTest {
@Test
public void testFailAndRetry() throws Exception {
+ @Cleanup("shutdownNow")
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
CompletableFuture<Boolean> callback = new CompletableFuture<>();
AtomicInteger atomicInteger = new AtomicInteger(0);
@@ -57,11 +59,11 @@ public class RetryUtilTest {
}, backoff, executor, callback);
assertTrue(callback.get());
assertEquals(atomicInteger.get(), 5);
- executor.shutdownNow();
}
@Test
public void testFail() throws Exception {
+ @Cleanup("shutdownNow")
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
CompletableFuture<Boolean> callback = new CompletableFuture<>();
Backoff backoff = new BackoffBuilder()
@@ -79,6 +81,5 @@ public class RetryUtilTest {
}
long time = System.currentTimeMillis() - start;
assertTrue(time >= 5000 - 2000, "Duration:" + time);
- executor.shutdownNow();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 7eae6462545..d4ef041f6de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -1324,6 +1324,7 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
final String topic = getTopicName(ns, "testCreateSchemaInParallel");
+ @Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(16);
List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = new
ArrayList<>(16);
CountDownLatch latch = new CountDownLatch(16);
@@ -1365,7 +1366,6 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
});
producers.clear();
producers2.clear();
- executor.shutdownNow();
}
@EqualsAndHashCode
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
index ad69df4757f..e36d9d2a194 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
@@ -58,7 +58,7 @@ import org.testng.annotations.Test;
@Test(groups = "websocket")
public class ProxyPublishConsumeClientSideEncryptionTest extends
ProducerConsumerBase {
private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
- private static final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(1);
+ private ScheduledExecutorService executor;
private static final Charset charset = Charset.defaultCharset();
private ProxyServer proxyServer;
@@ -66,6 +66,8 @@ public class ProxyPublishConsumeClientSideEncryptionTest
extends ProducerConsume
@BeforeClass
public void setup() throws Exception {
+ executor = Executors.newScheduledThreadPool(1);
+
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
super.internalSetup();
@@ -92,6 +94,7 @@ public class ProxyPublishConsumeClientSideEncryptionTest
extends ProducerConsume
if (proxyServer != null) {
proxyServer.stop();
}
+ executor.shutdownNow();
log.info("Finished Cleaning Up Test setup");
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
index 7d44c187d73..09ce9f9f137 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
@@ -183,6 +183,7 @@ public class FutureUtilTest {
public void testSequencer() {
int concurrentNum = 1000;
+ @Cleanup("shutdownNow")
final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(concurrentNum);
final FutureUtil.Sequencer<Void> sequencer =
FutureUtil.Sequencer.create();
// normal case -- allowExceptionBreakChain=false
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/TrustManagerProxyTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/TrustManagerProxyTest.java
index 8114f9b9356..ab31740bd5f 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/TrustManagerProxyTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/TrustManagerProxyTest.java
@@ -25,6 +25,7 @@ import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import lombok.Cleanup;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -41,15 +42,12 @@ public class TrustManagerProxyTest {
public void testLoadCA(String path, int count) {
String caPath = Resources.getResource(path).getPath();
+ @Cleanup("shutdownNow")
ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
- try {
- TrustManagerProxy trustManagerProxy =
- new TrustManagerProxy(caPath, 120, scheduledExecutor);
- X509Certificate[] x509Certificates =
trustManagerProxy.getAcceptedIssuers();
- assertNotNull(x509Certificates);
- assertEquals(Arrays.stream(x509Certificates).count(), count);
- } finally {
- scheduledExecutor.shutdown();
- }
+ TrustManagerProxy trustManagerProxy =
+ new TrustManagerProxy(caPath, 120, scheduledExecutor);
+ X509Certificate[] x509Certificates =
trustManagerProxy.getAcceptedIssuers();
+ assertNotNull(x509Certificates);
+ assertEquals(Arrays.stream(x509Certificates).count(), count);
}
}
diff --git
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
index 3e6cd8745dc..477a03e2ca5 100644
---
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
+++
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -18,28 +18,48 @@
*/
package org.apache.bookkeeper.mledger.offload.filesystem;
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import
org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Properties;
-import java.util.concurrent.Executors;
-
public abstract class FileStoreTestBase {
protected FileSystemManagedLedgerOffloader
fileSystemManagedLedgerOffloader;
- protected OrderedScheduler scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+ protected OrderedScheduler scheduler;
protected final String basePath = "pulsar";
private MiniDFSCluster hdfsCluster;
private String hdfsURI;
protected LedgerOffloaderStats offloaderStats;
+ private ScheduledExecutorService scheduledExecutorService;
+
+ @BeforeClass(alwaysRun = true)
+ public final void beforeClass() throws Exception {
+ init();
+ }
+
+ public void init() throws Exception {
+ scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public final void afterClass() {
+ cleanup();
+ }
+
+ public void cleanup() {
+ scheduler.shutdownNow();
+ }
@BeforeMethod(alwaysRun = true)
public void start() throws Exception {
@@ -51,7 +71,8 @@ public abstract class FileStoreTestBase {
hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/";
Properties properties = new Properties();
- this.offloaderStats = LedgerOffloaderStats.create(true, true,
Executors.newScheduledThreadPool(1), 60);
+ scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ this.offloaderStats = LedgerOffloaderStats.create(true, true,
scheduledExecutorService, 60);
fileSystemManagedLedgerOffloader = new
FileSystemManagedLedgerOffloader(
OffloadPoliciesImpl.create(properties),
scheduler, hdfsURI, basePath, offloaderStats);
@@ -61,6 +82,7 @@ public abstract class FileStoreTestBase {
public void tearDown() {
hdfsCluster.shutdown(true, true);
hdfsCluster.close();
+ scheduledExecutorService.shutdownNow();
}
public String getURI() {
diff --git
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
index b9de5d1a49e..7276be51217 100644
---
a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
+++
b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -19,6 +19,14 @@
package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
@@ -35,18 +43,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
- private final PulsarMockBookKeeper bk;
+ private PulsarMockBookKeeper bk;
private String managedLedgerName = "public/default/persistent/testOffload";
private String topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
private String storagePath = createStoragePath(managedLedgerName);
@@ -55,7 +54,9 @@ public class FileSystemManagedLedgerOffloaderTest extends
FileStoreTestBase {
private final int numberOfEntries = 601;
private Map<String, String> map = new HashMap<>();
- public FileSystemManagedLedgerOffloaderTest() throws Exception {
+ @Override
+ public void init() throws Exception {
+ super.init();
this.bk = new PulsarMockBookKeeper(scheduler);
this.toWrite = buildReadHandle();
map.put("ManagedLedgerName", managedLedgerName);
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index ac87a8e4240..bb4cb286680 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -23,8 +23,8 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,6 +38,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -45,8 +46,8 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
+import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import
org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.pulsar.common.naming.TopicName;
@@ -56,12 +57,14 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerOffloaderBase {
private static final Logger log =
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class);
+ private final ScheduledExecutorService scheduledExecutorService;
private TieredStorageConfiguration mockedConfig;
private final LedgerOffloaderStats offloaderStats;
@@ -72,13 +75,20 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
assertNotNull(provider);
provider.validate(config);
blobStore = provider.getBlobStore(config);
- this.offloaderStats = LedgerOffloaderStats.create(true, true,
Executors.newScheduledThreadPool(1), 60);
+ scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ this.offloaderStats = LedgerOffloaderStats.create(true, true,
scheduledExecutorService, 60);
+ }
+
+ @AfterClass(alwaysRun = true)
+ protected void cleanupInstance() throws Exception {
+ offloaderStats.close();
+ scheduledExecutorService.shutdownNow();
}
private BlobStoreManagedLedgerOffloader getOffloader() throws IOException {
return getOffloader(BUCKET);
}
-
+
private BlobStoreManagedLedgerOffloader getOffloader(BlobStore
mockedBlobStore) throws IOException {
return getOffloader(BUCKET, mockedBlobStore);
}
@@ -89,10 +99,10 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, this.offloaderStats);
return offloader;
}
-
+
private BlobStoreManagedLedgerOffloader getOffloader(String bucket,
BlobStore mockedBlobStore) throws IOException {
mockedConfig = mock(TieredStorageConfiguration.class,
delegatesTo(getConfiguration(bucket)));
- Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
+ Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
BlobStoreManagedLedgerOffloader offloader =
BlobStoreManagedLedgerOffloader.create(mockedConfig, new
HashMap<String,String>(), scheduler, this.offloaderStats);
return offloader;
}
@@ -209,9 +219,9 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
String failureString = "fail InitDataBlockUpload";
// mock throw exception when initiateMultipartUpload
- try {
+ try {
BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
-
+
Mockito
.doThrow(new RuntimeException(failureString))
.when(spiedBlobStore).initiateMultipartUpload(any(), any(),
any());
@@ -235,7 +245,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
// mock throw exception when uploadPart
try {
-
+
BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
Mockito
.doThrow(new RuntimeException(failureString))
@@ -269,7 +279,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
.when(spiedBlobStore).abortMultipartUpload(any());
BlobStoreManagedLedgerOffloader offloader =
getOffloader(spiedBlobStore);
- offloader.offload(readHandle, uuid, new HashMap<>()).get();
+ offloader.offload(readHandle, uuid, new HashMap<>()).get();
Assert.fail("Should throw exception for when
completeMultipartUpload");
} catch (Exception e) {
@@ -288,7 +298,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
String failureString = "fail putObject";
// mock throw exception when putObject
- try {
+ try {
BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
Mockito
.doThrow(new RuntimeException(failureString))
@@ -380,7 +390,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
public void testDeleteOffloaded() throws Exception {
ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
UUID uuid = UUID.randomUUID();
-
+
BlobStoreManagedLedgerOffloader offloader = getOffloader();
// verify object exist after offload
@@ -399,13 +409,13 @@ public class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreManagedLedgerO
String failureString = "fail deleteOffloaded";
ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
UUID uuid = UUID.randomUUID();
-
+
BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
Mockito
.doThrow(new RuntimeException(failureString))
.when(spiedBlobStore).removeBlobs(any(), any());
-
+
BlobStoreManagedLedgerOffloader offloader =
getOffloader(spiedBlobStore);
try {