This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c6704dfcd97 [fix][test] Fix thread leaks in Managed Ledger tests and 
remove duplicate shutdown code (#21426)
c6704dfcd97 is described below

commit c6704dfcd977c790168e4bbad36ac67b555a3041
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Oct 24 13:35:04 2023 +0300

    [fix][test] Fix thread leaks in Managed Ledger tests and remove duplicate 
shutdown code (#21426)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 68 ++++------------------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  8 ++-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  4 +-
 .../bookkeeper/test/MockedBookKeeperTestCase.java  | 10 +++-
 .../broker/testcontext/PulsarTestContext.java      |  3 +-
 .../metadata/impl/AbstractMetadataStore.java       |  4 +-
 .../replication/BookKeeperClusterTestCase.java     |  2 +-
 .../coordinator/test/MockedBookKeeperTestCase.java |  4 +-
 8 files changed, 35 insertions(+), 68 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 03605bf6e85..40e5411d777 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -535,13 +536,12 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         int numLedgers = ledgerNames.size();
         log.info("Closing {} ledgers", numLedgers);
         for (String ledgerName : ledgerNames) {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            futures.add(future);
             CompletableFuture<ManagedLedgerImpl> ledgerFuture = 
ledgers.remove(ledgerName);
             if (ledgerFuture == null) {
-                future.complete(null);
                 continue;
             }
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            futures.add(future);
             ledgerFuture.whenCompleteAsync((managedLedger, throwable) -> {
                 if (throwable != null || managedLedger == null) {
                     future.complete(null);
@@ -606,68 +606,20 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                 }));
             }
         }));
-        entryCacheManager.clear();
-        return FutureUtil.waitForAll(futures).thenAccept(__ -> {
+        return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> {
             //wait for tasks in scheduledExecutor executed.
-            scheduledExecutor.shutdown();
+            scheduledExecutor.shutdownNow();
+            entryCacheManager.clear();
         });
     }
 
     @Override
     public void shutdown() throws InterruptedException, ManagedLedgerException 
{
-        if (closed) {
-            throw new 
ManagedLedgerException.ManagedLedgerFactoryClosedException();
+        try {
+            shutdownAsync().get();
+        } catch (ExecutionException e) {
+            throw getManagedLedgerException(e.getCause());
         }
-        closed = true;
-
-        statsTask.cancel(true);
-        flushCursorsTask.cancel(true);
-        cacheEvictionExecutor.shutdownNow();
-
-        // take a snapshot of ledgers currently in the map to prevent race 
conditions
-        List<CompletableFuture<ManagedLedgerImpl>> ledgers = new 
ArrayList<>(this.ledgers.values());
-        int numLedgers = ledgers.size();
-        final CountDownLatch latch = new CountDownLatch(numLedgers);
-        log.info("Closing {} ledgers", numLedgers);
-
-        for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : ledgers) {
-            ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
-            if (ledger == null) {
-                latch.countDown();
-                continue;
-            }
-
-            ledger.asyncClose(new AsyncCallbacks.CloseCallback() {
-                @Override
-                public void closeComplete(Object ctx) {
-                    latch.countDown();
-                }
-
-                @Override
-                public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
-                    log.warn("[{}] Got exception when closing managed ledger: 
{}", ledger.getName(), exception);
-                    latch.countDown();
-                }
-            }, null);
-        }
-
-        latch.await();
-        log.info("{} ledgers closed", numLedgers);
-
-        if (isBookkeeperManaged) {
-            try {
-                BookKeeper bookkeeper = bookkeeperFactory.get();
-                if (bookkeeper != null) {
-                    bookkeeper.close();
-                }
-            } catch (BKException e) {
-                throw new ManagedLedgerException(e);
-            }
-        }
-
-        scheduledExecutor.shutdownNow();
-
-        entryCacheManager.clear();
     }
 
     @Override
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index cd61e00ccaa..5bd6c299d99 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -188,10 +188,10 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
                 bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
             } else {
                 deleteLedgerInfo.hasCalled = true;
-                new Thread(() -> {
+                cachedExecutor.submit(() -> {
                     
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
                     bkc.asyncDeleteLedger(ledgerId, cb, ctx);
-                }).start();
+                });
             }
             return null;
         }).when(spyBookKeeper).asyncDeleteLedger(any(long.class), 
any(AsyncCallback.DeleteCallback.class), any());
@@ -208,6 +208,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
     public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
         String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
         BookKeeper spyBookKeeper = spy(bkc);
+        @Cleanup("shutdown")
         ManagedLedgerFactoryImpl factory = new 
ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
         ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
 
@@ -3854,6 +3855,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
     public void testInactiveLedgerRollOver() throws Exception {
         int inactiveLedgerRollOverTimeMs = 5;
         ManagedLedgerFactoryConfig factoryConf = new 
ManagedLedgerFactoryConfig();
+        @Cleanup("shutdown")
         ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, 
TimeUnit.MILLISECONDS);
@@ -3885,11 +3887,11 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
         assertEquals(ledgers.size(), totalAddEntries);
         ledger.close();
-        factory.shutdown();
     }
 
     @Test
     public void testOffloadTaskCancelled() throws Exception {
+        @Cleanup("shutdown")
         ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(2);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 80bb6256591..0ddd04ebc48 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -240,7 +240,9 @@ public abstract class BookKeeperClusterTestCase {
         zkc = zkUtil.getZooKeeperClient();
         metadataStore = new FaultInjectionMetadataStore(
                 
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
-                MetadataStoreConfig.builder().build()));
+                        MetadataStoreConfig.builder()
+                                .metadataStoreName("metastore-" + 
getClass().getSimpleName())
+                                .build()));
     }
 
     /**
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index e2101268b09..645563eb78c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -70,7 +71,8 @@ public abstract class MockedBookKeeperTestCase {
     public final void setUp(Method method) throws Exception {
         LOG.info(">>>>>> starting {}", method);
         metadataStore = new FaultInjectionMetadataStore(
-                MetadataStoreExtended.create("memory:local", 
MetadataStoreConfig.builder().build()));
+                MetadataStoreExtended.create("memory:local",
+                        
MetadataStoreConfig.builder().metadataStoreName("metastore-" + 
method.getName()).build()));
 
         try {
             // start bookkeeper service
@@ -102,7 +104,11 @@ public abstract class MockedBookKeeperTestCase {
         }
         try {
             LOG.info("@@@@@@@@@ stopping " + method);
-            factory.shutdownAsync().get(10, TimeUnit.SECONDS);
+            try {
+                factory.shutdownAsync().get(10, TimeUnit.SECONDS);
+            } catch 
(ManagedLedgerException.ManagedLedgerFactoryClosedException e) {
+                // ignore
+            }
             factory = null;
             stopBookKeeper();
             metadataStore.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index c927a2e61d8..2e28ea8e70e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -589,7 +589,8 @@ public class PulsarTestContext implements AutoCloseable {
                 } else {
                     try {
                         MetadataStoreExtended store = 
MetadataStoreFactoryImpl.createExtended("memory:local",
-                                MetadataStoreConfig.builder().build());
+                                MetadataStoreConfig.builder()
+                                        
.metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
                         registerCloseable(() -> {
                             store.close();
                             resetSpyOrMock(store);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 7bc881c1254..9ba2588a07c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -89,7 +89,9 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     protected abstract CompletableFuture<Boolean> existsFromStore(String path);
 
     protected AbstractMetadataStore(String metadataStoreName) {
-        this.executor = new ScheduledThreadPoolExecutor(1, new 
DefaultThreadFactory(metadataStoreName));
+        this.executor = new ScheduledThreadPoolExecutor(1,
+                new DefaultThreadFactory(
+                        StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName()));
         registerListener(this);
 
         this.childrenCache = Caffeine.newBuilder()
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
index c681a1f0764..9a8e3ef5a2d 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/BookKeeperClusterTestCase.java
@@ -238,7 +238,7 @@ public abstract class BookKeeperClusterTestCase {
         zkc = zkUtil.getZooKeeperClient();
         metadataStore = new FaultInjectionMetadataStore(
                 
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
-                MetadataStoreConfig.builder().build()));
+                MetadataStoreConfig.builder().metadataStoreName("metastore-" + 
getClass().getSimpleName()).build()));
     }
 
     /**
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
index e0b10ca0280..ac5aa3bd892 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -71,7 +71,9 @@ public abstract class MockedBookKeeperTestCase {
     public void setUp(Method method) throws Exception {
         LOG.info(">>>>>> starting {}", method);
         metadataStore = new 
FaultInjectionMetadataStore(MetadataStoreExtended.create("memory:local",
-                MetadataStoreConfig.builder().build()));
+                MetadataStoreConfig.builder()
+                        .metadataStoreName("metastore-" + method.getName())
+                        .build()));
         try {
             // start bookkeeper service
             startBookKeeper();

Reply via email to