This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new d76d4cba78b [fix][ml] There are two same-named managed ledgers in the 
one broker (#18688)
d76d4cba78b is described below

commit d76d4cba78b4459cf757a05faaab551925fc3e2b
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jun 7 18:38:18 2023 +0800

    [fix][ml] There are two same-named managed ledgers in the one broker 
(#18688)
    
    (cherry picked from commit d7186a67fc13ae972a76fbd9ba59b96d8bc7daae)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 16 ++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +++++
 .../mledger/impl/ManagedLedgerFactoryTest.java     | 43 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index d7596a7468a..a2bc84728cc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -485,9 +485,19 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     }
 
     void close(ManagedLedger ledger) {
-        // Remove the ledger from the internal factory cache
-        ledgers.remove(ledger.getName());
-        entryCacheManager.removeEntryCache(ledger.getName());
+        // If the future in map is not done or has exceptionally complete, it 
means that @param-ledger is not in the
+        // map.
+        CompletableFuture<ManagedLedgerImpl> ledgerFuture = 
ledgers.get(ledger.getName());
+        if (ledgerFuture == null || !ledgerFuture.isDone() || 
ledgerFuture.isCompletedExceptionally()){
+            return;
+        }
+        if (ledgerFuture.join() != ledger){
+            return;
+        }
+        // Remove the ledger from the internal factory cache.
+        if (ledgers.remove(ledger.getName(), ledgerFuture)) {
+            entryCacheManager.removeEntryCache(ledger.getName());
+        }
     }
 
     public CompletableFuture<Void> shutdownAsync() throws 
ManagedLedgerException {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 37174503469..745befe747c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1482,6 +1482,15 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     @Override
     public synchronized void createComplete(int rc, final LedgerHandle lh, 
Object ctx) {
+        if (STATE_UPDATER.get(this) == State.Closed) {
+            if (lh != null) {
+                log.warn("[{}] ledger create completed after the managed 
ledger is closed rc={} ledger={}, so just"
+                        + " close this ledger handle.", name, rc, lh != null ? 
lh.getId() : -1);
+                lh.closeAsync();
+            }
+            return;
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != 
null ? lh.getId() : -1);
         }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
index f307bfa72a5..4cbd5d204bf 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
@@ -20,12 +20,16 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static org.testng.Assert.assertEquals;
 
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class ManagedLedgerFactoryTest extends MockedBookKeeperTestCase {
@@ -71,4 +75,43 @@ public class ManagedLedgerFactoryTest extends 
MockedBookKeeperTestCase {
         assertEquals(mri.to.entryId, 0);
     }
 
+    /**
+     * see: https://github.com/apache/pulsar/pull/18688
+     */
+    @Test
+    public void testConcurrentCloseLedgerAndSwitchLedgerForReproduceIssue() 
throws Exception {
+        String managedLedgerName = "lg_" + 
UUID.randomUUID().toString().replaceAll("-", "_");
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setThrottleMarkDelete(1);
+        config.setMaximumRolloverTime(Integer.MAX_VALUE, TimeUnit.SECONDS);
+        config.setMaxEntriesPerLedger(5);
+
+        // create managedLedger once and close it.
+        ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) 
factory.open(managedLedgerName, config);
+        waitManagedLedgerStateEquals(managedLedger1, 
ManagedLedgerImpl.State.LedgerOpened);
+        managedLedger1.close();
+
+        // create managedLedger the second time.
+        ManagedLedgerImpl managedLedger2 = (ManagedLedgerImpl) 
factory.open(managedLedgerName, config);
+        waitManagedLedgerStateEquals(managedLedger2, 
ManagedLedgerImpl.State.LedgerOpened);
+
+        // Mock the task create ledger complete now, it will change the state 
to another value which not is Closed.
+        // Close managedLedger1 the second time.
+        managedLedger1.createComplete(1, null, null);
+        managedLedger1.close();
+
+        // Verify managedLedger2 is still there.
+        Assert.assertFalse(factory.ledgers.isEmpty());
+        
Assert.assertEquals(factory.ledgers.get(managedLedger2.getName()).join(), 
managedLedger2);
+
+        // cleanup.
+        managedLedger2.close();
+    }
+
+    private void waitManagedLedgerStateEquals(ManagedLedgerImpl managedLedger, 
ManagedLedgerImpl.State expectedStat){
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(managedLedger.getState() == expectedStat));
+    }
+
 }

Reply via email to