Jason918 commented on code in PR #18265: URL: https://github.com/apache/pulsar/pull/18265#discussion_r1032212220
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java: ########## @@ -18,40 +18,363 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.metadata.api.Stat; /** * Working in progress until <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a> is finished. - * Currently, it works nothing different with ManagedLedgerImpl. */ @Slf4j public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { private final TopicName shadowSource; private final String sourceMLName; + private volatile Stat sourceLedgersStat; public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, String name, final Supplier<Boolean> mlOwnershipChecker) { super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); - this.shadowSource = TopicName.get(config.getShadowSource()); - this.sourceMLName = shadowSource.getPersistenceNamingEncoding(); + if (config.getTopicName().isPartitioned() && TopicName.getPartitionIndex(config.getShadowSource()) == -1) { Review Comment: > Why the `name` is not able to be used? It's a little confusing that we need to introduce the topic name in `ManagedLedgerConfig` Good point. 1. We need to know the `TopicName` of the source topic in the shadow topic. But the `name` here is in the format of `PersistenceNamingEncoding`. 2. I get that we'd better not introduce the concept of "topic" into managed-ledger. I will move this logic to the "broker" layer. So that the source topic name is in `properties` and the managedLedgerName of the source topic is stored in the new field `ManagedLedgerConfig.shadowSourceName`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org