This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8945b839b0aa46568d738c12daeef8a259f9cc3d Author: fengyubiao <[email protected]> AuthorDate: Tue Feb 21 21:47:28 2023 +0800 [fix] [ml] topic load fail by ledger lost (#19444) Makes only ledgers removed from the meta of ledger info can be deleted from the BK. (cherry picked from commit 3314d70231cbed89b6eefa2073ef8c048d84ec16) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 29 ++++-- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 116 ++++++++++++++++++++- 2 files changed, 134 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 012ef388010..14967728214 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -68,6 +68,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import javax.annotation.Nullable; import lombok.Getter; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; @@ -152,7 +153,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected final BookKeeper bookKeeper; protected final String name; private final Map<String, byte[]> ledgerMetadata; - private final BookKeeper.DigestType digestType; + protected final BookKeeper.DigestType digestType; protected ManagedLedgerConfig config; protected Map<String, String> propertiesMap; @@ -445,6 +446,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } // Calculate total entries and size + final List<Long> emptyLedgersToBeDeleted = Collections.synchronizedList(new ArrayList<>()); Iterator<LedgerInfo> iterator = ledgers.values().iterator(); while (iterator.hasNext()) { LedgerInfo li = iterator.next(); @@ -453,9 +455,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize()); } else { iterator.remove(); - bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> { - log.info("[{}] Deleted empty ledger ledgerId={} rc={}", name, li.getLedgerId(), rc); - }, null); + emptyLedgersToBeDeleted.add(li.getLedgerId()); } } @@ -471,6 +471,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationComplete(Void v, Stat stat) { ledgersStat = stat; + emptyLedgersToBeDeleted.forEach(ledgerId -> { + bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { + log.info("[{}] Deleted empty ledger ledgerId={} rc={}", name, ledgerId, rc); + }, null); + }); initializeCursors(callback); } @@ -1503,11 +1508,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } ledgersStat = stat; synchronized (ManagedLedgerImpl.this) { + LedgerHandle originalCurrentLedger = currentLedger; ledgers.put(lh.getId(), newLedger); currentLedger = lh; currentLedgerEntries = 0; currentLedgerSize = 0; - updateLedgersIdsComplete(stat); + updateLedgersIdsComplete(originalCurrentLedger); mbean.addLedgerSwitchLatencySample(System.currentTimeMillis() - lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS); } @@ -1578,8 +1584,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback); } - public synchronized void updateLedgersIdsComplete(Stat stat) { + public synchronized void updateLedgersIdsComplete(@Nullable LedgerHandle originalCurrentLedger) { STATE_UPDATER.set(this, State.LedgerOpened); + // Delete original "currentLedger" if it has been removed from "ledgers". + if (originalCurrentLedger != null && !ledgers.containsKey(originalCurrentLedger.getId())){ + bookKeeper.asyncDeleteLedger(originalCurrentLedger.getId(), (rc, ctx) -> { + mbean.endDataLedgerDeleteOp(); + log.info("[{}] Delete complete for empty ledger {}. rc={}", name, originalCurrentLedger.getId(), rc); + }, null); + } updateLastLedgerCreatedTimeAndScheduleRolloverTask(); if (log.isDebugEnabled()) { @@ -1657,10 +1670,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // The last ledger was empty, so we can discard it ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); - bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> { - mbean.endDataLedgerDeleteOp(); - log.info("[{}] Delete complete for empty ledger {}. rc={}", name, lh.getId(), rc); - }, null); } trimConsumedLedgersInBackground(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 68e72786be7..a6b8fa17bee 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -48,6 +48,7 @@ import java.lang.reflect.Field; import java.nio.ReadOnlyBufferException; import java.nio.charset.Charset; import java.security.GeneralSecurityException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -74,7 +75,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import lombok.Cleanup; +import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -144,6 +147,117 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } + private void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean addEntryFinished) throws Exception { + LedgerHandle currentLedger = ml.currentLedger; + final LedgerHandle spyLedgerHandle = spy(currentLedger); + doAnswer(invocation -> { + ByteBuf bs = (ByteBuf) invocation.getArguments()[0]; + AddCallback addCallback = (AddCallback) invocation.getArguments()[1]; + Object originalContext = invocation.getArguments()[2]; + currentLedger.asyncAddEntry(bs, (rc, lh, entryId, ctx) -> { + addEntryFinished.set(true); + addCallback.addComplete(BKException.Code.TimeoutException, spyLedgerHandle, -1, ctx); + }, originalContext); + return null; + }).when(spyLedgerHandle).asyncAddEntry(any(ByteBuf.class), any(AddCallback.class), any()); + ml.currentLedger = spyLedgerHandle; + } + + @Data + private static class DeleteLedgerInfo{ + volatile boolean hasCalled; + volatile CompletableFuture<Void> future = new CompletableFuture<>(); + } + + private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final AtomicBoolean signal, + BookKeeper spyBookKeeper) { + DeleteLedgerInfo deleteLedgerInfo = new DeleteLedgerInfo(); + doAnswer(invocation -> { + long ledgerId = (long) invocation.getArguments()[0]; + AsyncCallback.DeleteCallback originalCb = (AsyncCallback.DeleteCallback) invocation.getArguments()[1]; + AsyncCallback.DeleteCallback cb = (rc, ctx) -> { + if (deleteLedgerInfo.hasCalled) { + deleteLedgerInfo.future.complete(null); + } + originalCb.deleteComplete(rc, ctx); + }; + Object ctx = invocation.getArguments()[2]; + if (ledgerId != ledger.getId()){ + bkc.asyncDeleteLedger(ledgerId, originalCb, ctx); + } else { + deleteLedgerInfo.hasCalled = true; + new Thread(() -> { + Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get); + bkc.asyncDeleteLedger(ledgerId, cb, ctx); + }).start(); + } + return null; + }).when(spyBookKeeper).asyncDeleteLedger(any(long.class), any(AsyncCallback.DeleteCallback.class), any()); + return deleteLedgerInfo; + } + + /*** + * This test simulates the following problems that can occur when ZK connections are unstable: + * - add entry timeout + * - write ZK fail when update ledger info of ML + * and verifies that ledger info of ML is still correct when the above problems occur. + */ + @Test + public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception { + String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut"; + BookKeeper spyBookKeeper = spy(bkc); + ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName); + + // Make add entry timeout(The data write was actually successful). + AtomicBoolean addEntryFinished = new AtomicBoolean(false); + makeAddEntryTimeout(ml, addEntryFinished); + + // Make the update operation of ledger info failure when switch ledger. + metadataStore.failConditional(new MetadataStoreException.BadVersionException(""), (opType, path) -> { + if (opType == FaultInjectionMetadataStore.OperationType.PUT && addEntryFinished.get() + && "/managed-ledgers/testLedgerInfoMetaCorrectIfAddEntryTimeOut".equals(path)) { + return true; + } + return false; + }); + + // Make delete ledger is delayed if delete is called. + AtomicBoolean deleteLedgerDelaySignal = new AtomicBoolean(false); + DeleteLedgerInfo deleteLedgerInfo = + makeDelayIfDoLedgerDelete(ml.currentLedger, deleteLedgerDelaySignal, spyBookKeeper); + + // Add one entry. + // - it will fail and trigger ledger switch(we mocked the error). + // - ledger switch will also fail(we mocked the error). + try { + ml.addEntry("1".getBytes(Charset.defaultCharset())); + fail("Expected the operation of add entry will fail by timeout or ledger fenced."); + } catch (Exception e){ + // expected ex. + } + + // Reopen ML. + try { + ml.close(); + fail("Expected the operation of ml close will fail by fenced state."); + } catch (Exception e){ + // expected ex. + } + ManagedLedgerImpl mlReopened = (ManagedLedgerImpl) factory.open(mlName); + deleteLedgerDelaySignal.set(true); + if (deleteLedgerInfo.hasCalled){ + deleteLedgerInfo.future.join(); + } + mlReopened.close(); + + // verify: all ledgers in ledger info is worked. + for (long ledgerId : mlReopened.getLedgersInfo().keySet()){ + LedgerHandle lh = bkc.openLedger(ledgerId, ml.digestType, ml.getConfig().getPassword()); + lh.close(); + } + } + @Test public void managedLedgerApi() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); @@ -3124,7 +3238,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.pendingAddEntries.add(op); } - ledger.updateLedgersIdsComplete(mock(Stat.class)); + ledger.updateLedgersIdsComplete(null); for (int i = 0; i < 10; i++) { OpAddEntry oldOp = oldOps.get(i); if (i > 4) {
