This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0dc2cede6c76d23e02de039961977e38314a8d20 Author: LinChen <[email protected]> AuthorDate: Fri Jul 8 21:36:12 2022 +0800 [fix][broker] fix No such ledger exception (#16420) * fix No such ledger exception * move currentLedgerEntries and currentLedgerSize * move ledgers.put to operationComplete * fix ledgers.put * use ledgersTmp to write zookkeeper,after sucess,update ldgers * update updateLedgersListAfterRollover * put lh to ledgersTmp * extend buildManagedLedgerInfo * getManagedLedgerInfo(newLedger) after get metadataMutex (cherry picked from commit 2e5fbbc09af28d1ba4d159ee0cdb71ec7735bd29) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 32 ++++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 44e0ed96a25..3a35d71736e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1417,11 +1417,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { lastLedgerCreationFailureTimestamp = clock.millis(); } else { log.info("[{}] Created new ledger {}", name, lh.getId()); - ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); - currentLedger = lh; - currentLedgerEntries = 0; - currentLedgerSize = 0; - + LedgerInfo newLedger = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); final MetaStoreCallback<Void> cb = new MetaStoreCallback<Void>() { @Override public void operationComplete(Void v, Stat stat) { @@ -1429,6 +1425,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat); } ledgersStat = stat; + ledgers.put(lh.getId(), newLedger); + currentLedger = lh; + currentLedgerEntries = 0; + currentLedgerSize = 0; metadataMutex.unlock(); updateLedgersIdsComplete(stat); synchronized (ManagedLedgerImpl.this) { @@ -1443,8 +1443,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void operationFailed(MetaStoreException e) { log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage()); - // Remove the ledger, since we failed to update the list - ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> { mbean.endDataLedgerDeleteOp(); @@ -1479,21 +1477,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } }; - updateLedgersListAfterRollover(cb); + updateLedgersListAfterRollover(cb, newLedger); } } - - private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) { + private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) { if (!metadataMutex.tryLock()) { // Defer update for later - scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS); + scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback, newLedger), + 100, TimeUnit.MILLISECONDS); return; } if (log.isDebugEnabled()) { log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat); } - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, callback); + ManagedLedgerInfo mlInfo = getManagedLedgerInfo(newLedger); + store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback); } public synchronized void updateLedgersIdsComplete(Stat stat) { @@ -3483,8 +3482,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return mlInfo.build(); } + private ManagedLedgerInfo getManagedLedgerInfo(LedgerInfo newLedger) { + ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()) + .addLedgerInfo(newLedger); + return buildManagedLedgerInfo(mlInfo); + } private ManagedLedgerInfo buildManagedLedgerInfo(Map<Long, LedgerInfo> ledgers) { ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()); + return buildManagedLedgerInfo(mlInfo); + } + + private ManagedLedgerInfo buildManagedLedgerInfo(ManagedLedgerInfo.Builder mlInfo) { if (state == State.Terminated) { mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId()) .setEntryId(lastConfirmedEntry.getEntryId()));
