Jason918 commented on code in PR #18265: URL: https://github.com/apache/pulsar/pull/18265#discussion_r1032413814
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java: ########## @@ -18,40 +18,363 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.metadata.api.Stat; /** * Working in progress until <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a> is finished. - * Currently, it works nothing different with ManagedLedgerImpl. */ @Slf4j public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { private final TopicName shadowSource; private final String sourceMLName; + private volatile Stat sourceLedgersStat; public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, String name, final Supplier<Boolean> mlOwnershipChecker) { super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); - this.shadowSource = TopicName.get(config.getShadowSource()); - this.sourceMLName = shadowSource.getPersistenceNamingEncoding(); + if (config.getTopicName().isPartitioned() && TopicName.getPartitionIndex(config.getShadowSource()) == -1) { + this.shadowSource = + TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex()); + } else { + this.shadowSource = TopicName.get(config.getShadowSource()); + } + this.sourceMLName = + shadowSource.getPersistenceNamingEncoding(); } + /** + * ShadowManagedLedger init steps: + * 1. this.initialize : read source managedLedgerInfo + * 2. super.initialize : read its own read source managedLedgerInfo + * 3. this.initializeBookKeeper + * 4. super.initializeCursors + * @param callback + * @param ctx + */ @Override synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { - // TODO: ShadowManagedLedger has different initialize process from normal ManagedLedger, - // which is complicated and will be implemented in the next PRs. - super.initialize(callback, ctx); + log.info("Opening shadow managed ledger {} with source={}", name, sourceMLName); + + executor.executeOrdered(name, safeRun(() -> doInitialize(callback, ctx))); + } + + private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { + // Fetch the list of existing ledgers in the source managed ledger + store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) -> + executor.executeOrdered(name, safeRun(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat))) + ); + store.getManagedLedgerInfo(sourceMLName, false, null, new MetaStore.MetaStoreCallback<>() { + @Override + public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Source ML info:{}", name, sourceMLName, mlInfo); + } + sourceLedgersStat = stat; + if (mlInfo.getLedgerInfoCount() == 0) { + // Small chance here, since shadow topic is created after source topic exists. + log.warn("[{}] Source topic ledger list is empty! source={},mlInfo={},stat={}", name, sourceMLName, + mlInfo, stat); + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + return; + } + + if (mlInfo.hasTerminatedPosition()) { + state = State.Terminated; + lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); + log.info("[{}][{}] Recovering managed ledger terminated at {}", name, sourceMLName, + lastConfirmedEntry); + } + + for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { + ledgers.put(ls.getLedgerId(), ls); + } + + final long lastLedgerId = ledgers.lastKey(); + mbean.startDataLedgerOpenOp(); + AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> { + mbean.endDataLedgerOpenOp(); + if (log.isDebugEnabled()) { + log.debug("[{}] Opened source ledger {}", name, lastLedgerId); + } + if (rc == BKException.Code.OK) { + LedgerInfo info = + LedgerInfo.newBuilder() + .setLedgerId(lastLedgerId) + .setEntries(lh.getLastAddConfirmed() + 1) + .setSize(lh.getLength()) + .setTimestamp(clock.millis()).build(); + ledgers.put(lastLedgerId, info); + + //Always consider the last ledger is opened in source. + STATE_UPDATER.set(ShadowManagedLedgerImpl.this, State.LedgerOpened); + currentLedger = lh; + + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh) + .thenRun(() -> ShadowManagedLedgerImpl.super.initialize(callback, ctx)) + .exceptionally(ex -> { + callback.initializeFailed( + new ManagedLedgerException.ManagedLedgerInterceptException( + ex.getCause())); + return null; + }); + } else { + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + } + } else if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Source ledger not found: {}", name, lastLedgerId); + ledgers.remove(lastLedgerId); + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + } else { + log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId, + BKException.getMessage(rc)); + callback.initializeFailed(createManagedLedgerException(rc)); + } + })); + //open ledger in readonly mode. + bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(), opencb, null); + + } + + @Override + public void operationFailed(ManagedLedgerException.MetaStoreException e) { + if (e instanceof ManagedLedgerException.MetadataNotFoundException) { + callback.initializeFailed(new ManagedLedgerException.ManagedLedgerNotFoundException(e)); + } else { + callback.initializeFailed(new ManagedLedgerException(e)); + } + } + }); } public TopicName getShadowSource() { return shadowSource; } + + @Override + protected boolean isLedgersReadonly() { + return true; + } + + @Override + protected synchronized void initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) { + if (log.isDebugEnabled()) { + log.debug("[{}] initializing bookkeeper for shadowManagedLedger; ledgers {}", name, ledgers); + } + + // Calculate total entries and size + Iterator<LedgerInfo> iterator = ledgers.values().iterator(); + while (iterator.hasNext()) { + LedgerInfo li = iterator.next(); + if (li.getEntries() > 0) { + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries()); + TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize()); + } else if (li.getLedgerId() != currentLedger.getId()) { + //do not remove the last empty ledger. + iterator.remove(); + } + } + + initLastConfirmedEntry(); + // Save it back to ensure all nodes exist and properties are persisted. + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStore.MetaStoreCallback<>() { + @Override + public void operationComplete(Void result, Stat stat) { + ledgersStat = stat; + initializeCursors(callback); + } + + @Override + public void operationFailed(ManagedLedgerException.MetaStoreException e) { + handleBadVersion(e); + callback.initializeFailed(new ManagedLedgerException(e)); + } + }); + } + + private void initLastConfirmedEntry() { + if (lastConfirmedEntry != null || currentLedger == null) { + return; + } + lastConfirmedEntry = new PositionImpl(currentLedger.getId(), currentLedger.getLastAddConfirmed()); + // bypass empty ledgers, find last ledger with Message if possible. + while (lastConfirmedEntry.getEntryId() == -1) { + Map.Entry<Long, LedgerInfo> formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId()); + if (formerLedger != null) { + LedgerInfo ledgerInfo = formerLedger.getValue(); + lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); + } else { + break; + } + } + } + + @Override + protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + if (!beforeAddEntry(addOperation)) { + return; + } + if (state == State.Terminated) { + addOperation.failed(new ManagedLedgerException.ManagedLedgerTerminatedException( + "Managed ledger was already terminated")); + return; + } Review Comment: Yes, exactly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org