This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new d76d4cba78b [fix][ml] There are two same-named managed ledgers in the
one broker (#18688)
d76d4cba78b is described below
commit d76d4cba78b4459cf757a05faaab551925fc3e2b
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jun 7 18:38:18 2023 +0800
[fix][ml] There are two same-named managed ledgers in the one broker
(#18688)
(cherry picked from commit d7186a67fc13ae972a76fbd9ba59b96d8bc7daae)
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 16 ++++++--
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +++++
.../mledger/impl/ManagedLedgerFactoryTest.java | 43 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 3 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index d7596a7468a..a2bc84728cc 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -485,9 +485,19 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}
void close(ManagedLedger ledger) {
- // Remove the ledger from the internal factory cache
- ledgers.remove(ledger.getName());
- entryCacheManager.removeEntryCache(ledger.getName());
+ // If the future in map is not done or has exceptionally complete, it
means that @param-ledger is not in the
+ // map.
+ CompletableFuture<ManagedLedgerImpl> ledgerFuture =
ledgers.get(ledger.getName());
+ if (ledgerFuture == null || !ledgerFuture.isDone() ||
ledgerFuture.isCompletedExceptionally()){
+ return;
+ }
+ if (ledgerFuture.join() != ledger){
+ return;
+ }
+ // Remove the ledger from the internal factory cache.
+ if (ledgers.remove(ledger.getName(), ledgerFuture)) {
+ entryCacheManager.removeEntryCache(ledger.getName());
+ }
}
public CompletableFuture<Void> shutdownAsync() throws
ManagedLedgerException {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 37174503469..745befe747c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1482,6 +1482,15 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public synchronized void createComplete(int rc, final LedgerHandle lh,
Object ctx) {
+ if (STATE_UPDATER.get(this) == State.Closed) {
+ if (lh != null) {
+ log.warn("[{}] ledger create completed after the managed
ledger is closed rc={} ledger={}, so just"
+ + " close this ledger handle.", name, rc, lh != null ?
lh.getId() : -1);
+ lh.closeAsync();
+ }
+ return;
+ }
+
if (log.isDebugEnabled()) {
log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh !=
null ? lh.getId() : -1);
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
index f307bfa72a5..4cbd5d204bf 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
@@ -20,12 +20,16 @@ package org.apache.bookkeeper.mledger.impl;
import static org.testng.Assert.assertEquals;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class ManagedLedgerFactoryTest extends MockedBookKeeperTestCase {
@@ -71,4 +75,43 @@ public class ManagedLedgerFactoryTest extends
MockedBookKeeperTestCase {
assertEquals(mri.to.entryId, 0);
}
+ /**
+ * see: https://github.com/apache/pulsar/pull/18688
+ */
+ @Test
+ public void testConcurrentCloseLedgerAndSwitchLedgerForReproduceIssue()
throws Exception {
+ String managedLedgerName = "lg_" +
UUID.randomUUID().toString().replaceAll("-", "_");
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setThrottleMarkDelete(1);
+ config.setMaximumRolloverTime(Integer.MAX_VALUE, TimeUnit.SECONDS);
+ config.setMaxEntriesPerLedger(5);
+
+ // create managedLedger once and close it.
+ ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl)
factory.open(managedLedgerName, config);
+ waitManagedLedgerStateEquals(managedLedger1,
ManagedLedgerImpl.State.LedgerOpened);
+ managedLedger1.close();
+
+ // create managedLedger the second time.
+ ManagedLedgerImpl managedLedger2 = (ManagedLedgerImpl)
factory.open(managedLedgerName, config);
+ waitManagedLedgerStateEquals(managedLedger2,
ManagedLedgerImpl.State.LedgerOpened);
+
+ // Mock the task create ledger complete now, it will change the state
to another value which not is Closed.
+ // Close managedLedger1 the second time.
+ managedLedger1.createComplete(1, null, null);
+ managedLedger1.close();
+
+ // Verify managedLedger2 is still there.
+ Assert.assertFalse(factory.ledgers.isEmpty());
+
Assert.assertEquals(factory.ledgers.get(managedLedger2.getName()).join(),
managedLedger2);
+
+ // cleanup.
+ managedLedger2.close();
+ }
+
+ private void waitManagedLedgerStateEquals(ManagedLedgerImpl managedLedger,
ManagedLedgerImpl.State expectedStat){
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(managedLedger.getState() == expectedStat));
+ }
+
}