This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c6704dfcd97 [fix][test] Fix thread leaks in Managed Ledger tests and
remove duplicate shutdown code (#21426)
c6704dfcd97 is described below
commit c6704dfcd977c790168e4bbad36ac67b555a3041
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Oct 24 13:35:04 2023 +0300
[fix][test] Fix thread leaks in Managed Ledger tests and remove duplicate
shutdown code (#21426)
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 68 ++++------------------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 8 ++-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 4 +-
.../bookkeeper/test/MockedBookKeeperTestCase.java | 10 +++-
.../broker/testcontext/PulsarTestContext.java | 3 +-
.../metadata/impl/AbstractMetadataStore.java | 4 +-
.../replication/BookKeeperClusterTestCase.java | 2 +-
.../coordinator/test/MockedBookKeeperTestCase.java | 4 +-
8 files changed, 35 insertions(+), 68 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 03605bf6e85..40e5411d777 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -535,13 +536,12 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
int numLedgers = ledgerNames.size();
log.info("Closing {} ledgers", numLedgers);
for (String ledgerName : ledgerNames) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- futures.add(future);
CompletableFuture<ManagedLedgerImpl> ledgerFuture =
ledgers.remove(ledgerName);
if (ledgerFuture == null) {
- future.complete(null);
continue;
}
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ futures.add(future);
ledgerFuture.whenCompleteAsync((managedLedger, throwable) -> {
if (throwable != null || managedLedger == null) {
future.complete(null);
@@ -606,68 +606,20 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}));
}
}));
- entryCacheManager.clear();
- return FutureUtil.waitForAll(futures).thenAccept(__ -> {
+ return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
- scheduledExecutor.shutdown();
+ scheduledExecutor.shutdownNow();
+ entryCacheManager.clear();
});
}
@Override
public void shutdown() throws InterruptedException, ManagedLedgerException
{
- if (closed) {
- throw new
ManagedLedgerException.ManagedLedgerFactoryClosedException();
+ try {
+ shutdownAsync().get();
+ } catch (ExecutionException e) {
+ throw getManagedLedgerException(e.getCause());
}
- closed = true;
-
- statsTask.cancel(true);
- flushCursorsTask.cancel(true);
- cacheEvictionExecutor.shutdownNow();
-
- // take a snapshot of ledgers currently in the map to prevent race
conditions
- List<CompletableFuture<ManagedLedgerImpl>> ledgers = new
ArrayList<>(this.ledgers.values());
- int numLedgers = ledgers.size();
- final CountDownLatch latch = new CountDownLatch(numLedgers);
- log.info("Closing {} ledgers", numLedgers);
-
- for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : ledgers) {
- ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
- if (ledger == null) {
- latch.countDown();
- continue;
- }
-
- ledger.asyncClose(new AsyncCallbacks.CloseCallback() {
- @Override
- public void closeComplete(Object ctx) {
- latch.countDown();
- }
-
- @Override
- public void closeFailed(ManagedLedgerException exception,
Object ctx) {
- log.warn("[{}] Got exception when closing managed ledger:
{}", ledger.getName(), exception);
- latch.countDown();
- }
- }, null);
- }
-
- latch.await();
- log.info("{} ledgers closed", numLedgers);
-
- if (isBookkeeperManaged) {
- try {
- BookKeeper bookkeeper = bookkeeperFactory.get();
- if (bookkeeper != null) {
- bookkeeper.close();
- }
- } catch (BKException e) {
- throw new ManagedLedgerException(e);
- }
- }
-
- scheduledExecutor.shutdownNow();
-
- entryCacheManager.clear();
}
@Override
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index cd61e00ccaa..5bd6c299d99 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -188,10 +188,10 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
} else {
deleteLedgerInfo.hasCalled = true;
- new Thread(() -> {
+ cachedExecutor.submit(() -> {
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
bkc.asyncDeleteLedger(ledgerId, cb, ctx);
- }).start();
+ });
}
return null;
}).when(spyBookKeeper).asyncDeleteLedger(any(long.class),
any(AsyncCallback.DeleteCallback.class), any());
@@ -208,6 +208,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
BookKeeper spyBookKeeper = spy(bkc);
+ @Cleanup("shutdown")
ManagedLedgerFactoryImpl factory = new
ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
@@ -3854,6 +3855,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
public void testInactiveLedgerRollOver() throws Exception {
int inactiveLedgerRollOverTimeMs = 5;
ManagedLedgerFactoryConfig factoryConf = new
ManagedLedgerFactoryConfig();
+ @Cleanup("shutdown")
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs,
TimeUnit.MILLISECONDS);
@@ -3885,11 +3887,11 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
assertEquals(ledgers.size(), totalAddEntries);
ledger.close();
- factory.shutdown();
}
@Test
public void testOffloadTaskCancelled() throws Exception {
+ @Cleanup("shutdown")
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(2);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 80bb6256591..0ddd04ebc48 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -240,7 +240,9 @@ public abstract class BookKeeperClusterTestCase {
zkc = zkUtil.getZooKeeperClient();
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
- MetadataStoreConfig.builder().build()));
+ MetadataStoreConfig.builder()
+ .metadataStoreName("metastore-" +
getClass().getSimpleName())
+ .build()));
}
/**
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index e2101268b09..645563eb78c 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -70,7 +71,8 @@ public abstract class MockedBookKeeperTestCase {
public final void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(
- MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().build()));
+ MetadataStoreExtended.create("memory:local",
+
MetadataStoreConfig.builder().metadataStoreName("metastore-" +
method.getName()).build()));
try {
// start bookkeeper service
@@ -102,7 +104,11 @@ public abstract class MockedBookKeeperTestCase {
}
try {
LOG.info("@@@@@@@@@ stopping " + method);
- factory.shutdownAsync().get(10, TimeUnit.SECONDS);
+ try {
+ factory.shutdownAsync().get(10, TimeUnit.SECONDS);
+ } catch
(ManagedLedgerException.ManagedLedgerFactoryClosedException e) {
+ // ignore
+ }
factory = null;
stopBookKeeper();
metadataStore.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index c927a2e61d8..2e28ea8e70e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -589,7 +589,8 @@ public class PulsarTestContext implements AutoCloseable {
} else {
try {
MetadataStoreExtended store =
MetadataStoreFactoryImpl.createExtended("memory:local",
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder()
+
.metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
registerCloseable(() -> {
store.close();
resetSpyOrMock(store);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 7bc881c1254..9ba2588a07c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -89,7 +89,9 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected abstract CompletableFuture<Boolean> existsFromStore(String path);
protected AbstractMetadataStore(String metadataStoreName) {
- this.executor = new ScheduledThreadPoolExecutor(1, new
DefaultThreadFactory(metadataStoreName));
+ this.executor = new ScheduledThreadPoolExecutor(1,
+ new DefaultThreadFactory(
+ StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName()));
registerListener(this);
this.childrenCache = Caffeine.newBuilder()
diff --git
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
index c681a1f0764..9a8e3ef5a2d 100644
---
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
+++
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
@@ -238,7 +238,7 @@ public abstract class BookKeeperClusterTestCase {
zkc = zkUtil.getZooKeeperClient();
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
- MetadataStoreConfig.builder().build()));
+ MetadataStoreConfig.builder().metadataStoreName("metastore-" +
getClass().getSimpleName()).build()));
}
/**
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
index e0b10ca0280..ac5aa3bd892 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -71,7 +71,9 @@ public abstract class MockedBookKeeperTestCase {
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new
FaultInjectionMetadataStore(MetadataStoreExtended.create("memory:local",
- MetadataStoreConfig.builder().build()));
+ MetadataStoreConfig.builder()
+ .metadataStoreName("metastore-" + method.getName())
+ .build()));
try {
// start bookkeeper service
startBookKeeper();