horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r947000060


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2570,70 +2651,107 @@ void internalTrimLedgers(boolean isTruncate, 
CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
-            // Update metadata
-            for (LedgerInfo ls : ledgersToDelete) {
-                if (currentLastConfirmedEntry != null && ls.getLedgerId() == 
currentLastConfirmedEntry.getLedgerId()) {
-                    // this info is relevant because the lastMessageId won't 
be available anymore
-                    log.info("[{}] Ledger {} contains the current last 
confirmed entry {}, and it is going to be "
-                             + "deleted", name, ls.getLedgerId(), 
currentLastConfirmedEntry);
-                }
-
-                invalidateReadHandle(ls.getLedgerId());
-
-                ledgers.remove(ls.getLedgerId());
-                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
-                TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
 
-                entryCache.invalidateAllEntries(ls.getLedgerId());
-            }
-            for (LedgerInfo ls : offloadedLedgersToDelete) {
-                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
-                String driverName = OffloadUtils.getOffloadDriverName(ls,
-                        config.getLedgerOffloader().getOffloadDriverName());
-                Map<String, String> driverMetadata = 
OffloadUtils.getOffloadDriverMetadata(ls,
-                        
config.getLedgerOffloader().getOffloadDriverMetadata());
-                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, 
driverName, driverMetadata);
-                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
-            }
+            // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers =
+                    Stream.concat(ledgersToDelete.stream().filter(ls -> 
!ls.getOffloadContext().getBookkeeperDeleted()),
+                            
offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())
+                    .map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            CompletableFuture<?> appendDeleteLedgerFuture =
+                    appendPendingDeleteLedger(deletableLedgers, 
deletableOffloadedLedgers);
+            appendDeleteLedgerFuture.thenAccept(ignore -> {
+                believedDeleteIds.addAll(deletableLedgers);
+                for (LedgerInfo ls : ledgersToDelete) {
+                    if (currentLastConfirmedEntry != null
+                            && ls.getLedgerId() == 
currentLastConfirmedEntry.getLedgerId()) {
+                        // this info is relevant because the lastMessageId 
won't be available anymore
+                        log.info("[{}] Ledger {} contains the current last 
confirmed entry {}, and it is going to be "
+                                + "deleted", name, ls.getLedgerId(), 
currentLastConfirmedEntry);
+                    }
+                    invalidateReadHandle(ls.getLedgerId());
+                    ledgers.remove(ls.getLedgerId());
+                    entryCache.invalidateAllEntries(ls.getLedgerId());
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Updating of ledgers list after trimming", 
name);
-            }
+                    NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, 
-ls.getEntries());
+                    TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
+                }
+                for (LedgerInfo ls : offloadedLedgersToDelete) {
+                    LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
+                    
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                    String driverName = OffloadUtils.getOffloadDriverName(ls,
+                            
config.getLedgerOffloader().getOffloadDriverName());
+                    Map<String, String> driverMetadata = 
OffloadUtils.getOffloadDriverMetadata(ls,
+                            
config.getLedgerOffloader().getOffloadDriverMetadata());
+                    OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, 
driverName, driverMetadata);
+                    ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
+                }
 
-            store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), 
ledgersStat, new MetaStoreCallback<Void>() {
-                @Override
-                public void operationComplete(Void result, Stat stat) {
-                    log.info("[{}] End TrimConsumedLedgers. ledgers={} 
totalSize={}", name, ledgers.size(),
-                            TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
-                    ledgersStat = stat;
-                    metadataMutex.unlock();
-                    trimmerMutex.unlock();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Updating of ledgers list after trimming", 
name);
+                }
 
-                    for (LedgerInfo ls : ledgersToDelete) {
-                        log.info("[{}] Removing ledger {} - size: {}", name, 
ls.getLedgerId(), ls.getSize());
-                        asyncDeleteLedger(ls.getLedgerId(), ls);
-                    }
-                    for (LedgerInfo ls : offloadedLedgersToDelete) {
-                        log.info("[{}] Deleting offloaded ledger {} from 
bookkeeper - size: {}", name, ls.getLedgerId(),
-                                ls.getSize());
-                        asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
+                store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), 
ledgersStat, new MetaStoreCallback<Void>() {
+                    @Override
+                    public void operationComplete(Void result, Stat stat) {
+                        log.info("[{}] End TrimConsumedLedgers. ledgers={} 
totalSize={}", name, ledgers.size(),
+                                
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
+                        ledgersStat = stat;
+                        metadataMutex.unlock();
+                        trimmerMutex.unlock();
+                        if (ledgerDeletionService instanceof 
LedgerDeletionService.LedgerDeletionServiceDisable) {

Review Comment:
   good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to