Jason918 commented on code in PR #18265:
URL: https://github.com/apache/pulsar/pull/18265#discussion_r1032413814


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,363 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;
 
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
                                    String name, final Supplier<Boolean> 
mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
-        this.shadowSource = TopicName.get(config.getShadowSource());
-        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+        if (config.getTopicName().isPartitioned() && 
TopicName.getPartitionIndex(config.getShadowSource()) == -1) {
+            this.shadowSource =
+                    
TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex());
+        } else {
+            this.shadowSource = TopicName.get(config.getShadowSource());
+        }
+        this.sourceMLName =
+                shadowSource.getPersistenceNamingEncoding();
     }
 
+    /**
+     * ShadowManagedLedger init steps:
+     * 1. this.initialize : read source managedLedgerInfo
+     * 2. super.initialize : read its own read source managedLedgerInfo
+     * 3. this.initializeBookKeeper
+     * 4. super.initializeCursors
+     * @param callback
+     * @param ctx
+     */
     @Override
     synchronized void initialize(ManagedLedgerInitializeLedgerCallback 
callback, Object ctx) {
-        // TODO: ShadowManagedLedger has different initialize process from 
normal ManagedLedger,
-        //  which is complicated and will be implemented in the next PRs.
-        super.initialize(callback, ctx);
+        log.info("Opening shadow managed ledger {} with source={}", name, 
sourceMLName);
+
+        executor.executeOrdered(name, safeRun(() -> doInitialize(callback, 
ctx)));
+    }
+
+    private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, 
Object ctx) {
+        // Fetch the list of existing ledgers in the source managed ledger
+        store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
+                executor.executeOrdered(name, safeRun(() -> 
processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
+        );
+        store.getManagedLedgerInfo(sourceMLName, false, null, new 
MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo 
mlInfo, Stat stat) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Source ML info:{}", name, 
sourceMLName, mlInfo);
+                }
+                sourceLedgersStat = stat;
+                if (mlInfo.getLedgerInfoCount() == 0) {
+                    // Small chance here, since shadow topic is created after 
source topic exists.
+                    log.warn("[{}] Source topic ledger list is empty! 
source={},mlInfo={},stat={}", name, sourceMLName,
+                            mlInfo, stat);
+                    ShadowManagedLedgerImpl.super.initialize(callback, ctx);
+                    return;
+                }
+
+                if (mlInfo.hasTerminatedPosition()) {
+                    state = State.Terminated;
+                    lastConfirmedEntry = new 
PositionImpl(mlInfo.getTerminatedPosition());
+                    log.info("[{}][{}] Recovering managed ledger terminated at 
{}", name, sourceMLName,
+                            lastConfirmedEntry);
+                }
+
+                for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
+                    ledgers.put(ls.getLedgerId(), ls);
+                }
+
+                final long lastLedgerId = ledgers.lastKey();
+                mbean.startDataLedgerOpenOp();
+                AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> 
executor.executeOrdered(name, safeRun(() -> {
+                    mbean.endDataLedgerOpenOp();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Opened source ledger {}", name, 
lastLedgerId);
+                    }
+                    if (rc == BKException.Code.OK) {
+                        LedgerInfo info =
+                                LedgerInfo.newBuilder()
+                                        .setLedgerId(lastLedgerId)
+                                        .setEntries(lh.getLastAddConfirmed() + 
1)
+                                        .setSize(lh.getLength())
+                                        .setTimestamp(clock.millis()).build();
+                        ledgers.put(lastLedgerId, info);
+
+                        //Always consider the last ledger is opened in source.
+                        STATE_UPDATER.set(ShadowManagedLedgerImpl.this, 
State.LedgerOpened);
+                        currentLedger = lh;
+
+                        if (managedLedgerInterceptor != null) {
+                            
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh)
+                                    .thenRun(() -> 
ShadowManagedLedgerImpl.super.initialize(callback, ctx))
+                                    .exceptionally(ex -> {
+                                        callback.initializeFailed(
+                                                new 
ManagedLedgerException.ManagedLedgerInterceptException(
+                                                        ex.getCause()));
+                                        return null;
+                                    });
+                        } else {
+                            ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                        }
+                    } else if (isNoSuchLedgerExistsException(rc)) {
+                        log.warn("[{}] Source ledger not found: {}", name, 
lastLedgerId);
+                        ledgers.remove(lastLedgerId);
+                        ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                    } else {
+                        log.error("[{}] Failed to open source ledger {}: {}", 
name, lastLedgerId,
+                                BKException.getMessage(rc));
+                        
callback.initializeFailed(createManagedLedgerException(rc));
+                    }
+                }));
+                //open ledger in readonly mode.
+                bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, 
config.getPassword(), opencb, null);
+
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                if (e instanceof 
ManagedLedgerException.MetadataNotFoundException) {
+                    callback.initializeFailed(new 
ManagedLedgerException.ManagedLedgerNotFoundException(e));
+                } else {
+                    callback.initializeFailed(new ManagedLedgerException(e));
+                }
+            }
+        });
     }
 
     public TopicName getShadowSource() {
         return shadowSource;
     }
+
+    @Override
+    protected boolean isLedgersReadonly() {
+        return true;
+    }
+
+    @Override
+    protected synchronized void 
initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] initializing bookkeeper for shadowManagedLedger; 
ledgers {}", name, ledgers);
+        }
+
+        // Calculate total entries and size
+        Iterator<LedgerInfo> iterator = ledgers.values().iterator();
+        while (iterator.hasNext()) {
+            LedgerInfo li = iterator.next();
+            if (li.getEntries() > 0) {
+                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries());
+                TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
+            } else if (li.getLedgerId() != currentLedger.getId()) {
+                //do not remove the last empty ledger.
+                iterator.remove();
+            }
+        }
+
+        initLastConfirmedEntry();
+        // Save it back to ensure all nodes exist and properties are persisted.
+        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, 
new MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(Void result, Stat stat) {
+                ledgersStat = stat;
+                initializeCursors(callback);
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                handleBadVersion(e);
+                callback.initializeFailed(new ManagedLedgerException(e));
+            }
+        });
+    }
+
+    private void initLastConfirmedEntry() {
+        if (lastConfirmedEntry != null || currentLedger == null) {
+            return;
+        }
+        lastConfirmedEntry = new PositionImpl(currentLedger.getId(), 
currentLedger.getLastAddConfirmed());
+        // bypass empty ledgers, find last ledger with Message if possible.
+        while (lastConfirmedEntry.getEntryId() == -1) {
+            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+            if (formerLedger != null) {
+                LedgerInfo ledgerInfo = formerLedger.getValue();
+                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) 
{
+        if (!beforeAddEntry(addOperation)) {
+            return;
+        }
+        if (state == State.Terminated) {
+            addOperation.failed(new 
ManagedLedgerException.ManagedLedgerTerminatedException(
+                    "Managed ledger was already terminated"));
+            return;
+        }

Review Comment:
   Yes, exactly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to