AnonHxy commented on code in PR #18265: URL: https://github.com/apache/pulsar/pull/18265#discussion_r1023716092
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java: ########## @@ -18,40 +18,365 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.metadata.api.Stat; /** * Working in progress until <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a> is finished. - * Currently, it works nothing different with ManagedLedgerImpl. */ @Slf4j public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { private final TopicName shadowSource; private final String sourceMLName; + private volatile Stat sourceLedgersStat; Review Comment: It seems that this variable is unused ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ########## @@ -335,7 +348,7 @@ public TransportCnx getCnx() { return this.cnx; } - private static final class MessagePublishContext implements PublishContext, Runnable { + private static final class MessagePublishContext implements PublishContext, Runnable, Position { Review Comment: OK. I see. Should we write some comments to explain this? ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java: ########## @@ -18,40 +18,365 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.metadata.api.Stat; /** * Working in progress until <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a> is finished. - * Currently, it works nothing different with ManagedLedgerImpl. */ @Slf4j public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { private final TopicName shadowSource; private final String sourceMLName; + private volatile Stat sourceLedgersStat; public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, String name, final Supplier<Boolean> mlOwnershipChecker) { super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); - this.shadowSource = TopicName.get(config.getShadowSource()); - this.sourceMLName = shadowSource.getPersistenceNamingEncoding(); + if (config.getTopicName().isPartitioned() && TopicName.getPartitionIndex(config.getShadowSource()) == -1) { + this.shadowSource = + TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex()); + } else { + this.shadowSource = TopicName.get(config.getShadowSource()); + } + this.sourceMLName = + shadowSource.getPersistenceNamingEncoding(); } + /** + * ShadowManagedLedger init steps: + * 1. this.initialize : read source managedLedgerInfo + * 2. super.initialize : read its own read source managedLedgerInfo + * 3. this.initializeBookKeeper + * 4. super.initializeCursors + * @param callback + * @param ctx + */ @Override synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { - // TODO: ShadowManagedLedger has different initialize process from normal ManagedLedger, - // which is complicated and will be implemented in the next PRs. - super.initialize(callback, ctx); + log.info("Opening shadow managed ledger {} with source={}", name, sourceMLName); + + executor.executeOrdered(name, safeRun(() -> doInitialize(callback, ctx))); + } + + private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { + // Fetch the list of existing ledgers in the source managed ledger + store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) -> + executor.executeOrdered(name, safeRun(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat))) + ); + store.getManagedLedgerInfo(sourceMLName, false, null, new MetaStore.MetaStoreCallback<>() { + @Override + public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Source ML info:{}", name, sourceMLName, mlInfo); + } + sourceLedgersStat = stat; + // Fails if init with empty ledger. Very small chance here, since shadow topic is + // created when source topic exists. + if (mlInfo.getLedgerInfoCount() == 0) { + log.warn("[{}] Source topic ledger list is empty! source={},mlInfo={},stat={}", name, sourceMLName, + mlInfo, stat); +// callback.initializeFailed(new ManagedLedgerException.ManagedLedgerSourceNotReadyException( Review Comment: Cleanup this commented code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
