Jason918 commented on code in PR #18265:
URL: https://github.com/apache/pulsar/pull/18265#discussion_r1032212220


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,363 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;
 
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
                                    String name, final Supplier<Boolean> 
mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
-        this.shadowSource = TopicName.get(config.getShadowSource());
-        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+        if (config.getTopicName().isPartitioned() && 
TopicName.getPartitionIndex(config.getShadowSource()) == -1) {

Review Comment:
   > Why the `name` is not able to be used? It's a little confusing that we 
need to introduce the topic name in `ManagedLedgerConfig`
   
   Good point.
   
   1. We need to know the `TopicName` of the source topic in the shadow topic. 
But the `name` here is in the format of `PersistenceNamingEncoding`.
   2. I get that we'd better not introduce the concept of "topic" into 
managed-ledger. I will move this logic to the "broker" layer. So that the 
source topic name is in `properties` and the managedLedgerName of the source 
topic is stored in the new field `ManagedLedgerConfig.shadowSourceName`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to