This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 981aece75af [fix][ml] There are two same-named managed ledgers in the
one broker (#18688)
981aece75af is described below
commit 981aece75af88701749857521b62058f0aaa140c
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jun 7 18:38:18 2023 +0800
[fix][ml] There are two same-named managed ledgers in the one broker
(#18688)
(cherry picked from commit d7186a67fc13ae972a76fbd9ba59b96d8bc7daae)
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 16 +++++--
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 ++++
.../mledger/impl/ManagedLedgerFactoryTest.java | 50 ++++++++++++++++++----
3 files changed, 64 insertions(+), 11 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 6b323145653..b3680d9610c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -483,9 +483,19 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}
void close(ManagedLedger ledger) {
- // Remove the ledger from the internal factory cache
- ledgers.remove(ledger.getName());
- entryCacheManager.removeEntryCache(ledger.getName());
+ // If the future in map is not done or has exceptionally complete, it
means that @param-ledger is not in the
+ // map.
+ CompletableFuture<ManagedLedgerImpl> ledgerFuture =
ledgers.get(ledger.getName());
+ if (ledgerFuture == null || !ledgerFuture.isDone() ||
ledgerFuture.isCompletedExceptionally()){
+ return;
+ }
+ if (ledgerFuture.join() != ledger){
+ return;
+ }
+ // Remove the ledger from the internal factory cache.
+ if (ledgers.remove(ledger.getName(), ledgerFuture)) {
+ entryCacheManager.removeEntryCache(ledger.getName());
+ }
}
public CompletableFuture<Void> shutdownAsync() throws
ManagedLedgerException {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 07770142241..30aa852a515 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1423,6 +1423,15 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public synchronized void createComplete(int rc, final LedgerHandle lh,
Object ctx) {
+ if (STATE_UPDATER.get(this) == State.Closed) {
+ if (lh != null) {
+ log.warn("[{}] ledger create completed after the managed
ledger is closed rc={} ledger={}, so just"
+ + " close this ledger handle.", name, rc, lh != null ?
lh.getId() : -1);
+ lh.closeAsync();
+ }
+ return;
+ }
+
if (log.isDebugEnabled()) {
log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh !=
null ? lh.getId() : -1);
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
index 1381542e66a..9206bd6ae38 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryTest.java
@@ -19,23 +19,18 @@
package org.apache.bookkeeper.mledger.impl;
import static org.testng.Assert.assertEquals;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.mledger.Entry;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.List;
-
public class ManagedLedgerFactoryTest extends MockedBookKeeperTestCase {
@Test(timeOut = 20000)
@@ -79,4 +74,43 @@ public class ManagedLedgerFactoryTest extends
MockedBookKeeperTestCase {
assertEquals(mri.to.entryId, 0);
}
+ /**
+ * see: https://github.com/apache/pulsar/pull/18688
+ */
+ @Test
+ public void testConcurrentCloseLedgerAndSwitchLedgerForReproduceIssue()
throws Exception {
+ String managedLedgerName = "lg_" +
UUID.randomUUID().toString().replaceAll("-", "_");
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setThrottleMarkDelete(1);
+ config.setMaximumRolloverTime(Integer.MAX_VALUE, TimeUnit.SECONDS);
+ config.setMaxEntriesPerLedger(5);
+
+ // create managedLedger once and close it.
+ ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl)
factory.open(managedLedgerName, config);
+ waitManagedLedgerStateEquals(managedLedger1,
ManagedLedgerImpl.State.LedgerOpened);
+ managedLedger1.close();
+
+ // create managedLedger the second time.
+ ManagedLedgerImpl managedLedger2 = (ManagedLedgerImpl)
factory.open(managedLedgerName, config);
+ waitManagedLedgerStateEquals(managedLedger2,
ManagedLedgerImpl.State.LedgerOpened);
+
+ // Mock the task create ledger complete now, it will change the state
to another value which not is Closed.
+ // Close managedLedger1 the second time.
+ managedLedger1.createComplete(1, null, null);
+ managedLedger1.close();
+
+ // Verify managedLedger2 is still there.
+ Assert.assertFalse(factory.ledgers.isEmpty());
+
Assert.assertEquals(factory.ledgers.get(managedLedger2.getName()).join(),
managedLedger2);
+
+ // cleanup.
+ managedLedger2.close();
+ }
+
+ private void waitManagedLedgerStateEquals(ManagedLedgerImpl managedLedger,
ManagedLedgerImpl.State expectedStat){
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(managedLedger.getState() == expectedStat));
+ }
+
}