wuzhanpeng commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r792360487



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -4039,6 +4122,190 @@ public void clearBacklogFailed(ManagedLedgerException 
exception, Object ctx) {
         return statFuture;
     }
 
+    /**
+     * During the execution of this method, lock {@code metadataMutex} needs 
to be held
+     * because the {@code propertiesMap} would be updated (not thread-safe).
+     * @param deletableLedgerIds
+     */
+    @Override
+    public void markDeletableLedgers(Collection<Long> deletableLedgerIds,
+                                     Collection<Long> 
deletableOffloadedLedgerIds) {
+        for (Long ledgerId : deletableLedgerIds) {
+            final String deletableLedgerMarker = 
DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+            propertiesMap.put(deletableLedgerMarker, 
DELETABLE_LEDGER_PLACEHOLDER);
+        }
+        for (Long ledgerId : deletableOffloadedLedgerIds) {
+            final String deletableOffloadedLedgerMarker = 
DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+            // Offload context info is required in ledger cleanup, therefore 
the serialized info object
+            // is kept in the propertiesMap until the ledger deletion is done
+            final String offloadedLedgerInfo = 
BaseEncoding.base64().encode(ledgers.get(ledgerId).toByteArray());
+            propertiesMap.put(deletableOffloadedLedgerMarker, 
offloadedLedgerInfo);
+        }
+    }
+
+    private Set<Long> getAllDeletableLedgers(String prefix) {
+        Set<Long> deletableLedgers = propertiesMap.keySet().stream()
+                .filter(k -> k.startsWith(prefix))
+                .map(k -> {
+                    Long ledgerId = 
Long.parseLong(k.substring(prefix.length()));
+                    if (deletableLedgerRetryCounter.containsKey(ledgerId)
+                            && deletableLedgerRetryCounter.get(ledgerId).get() 
>= DEFAULT_LEDGER_DELETE_RETRIES) {
+                        log.error("[{}] Cannot delete ledger:{} after {} 
reties and now stop retrying on this broker",
+                                name, ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+                        return null;
+                    }
+                    return ledgerId;
+                })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        if (!deletableLedgers.isEmpty()) {
+            return deletableLedgers;
+        }
+        return Sets.newHashSet();
+    }
+
+    @Override
+    public Set<Long> getAllDeletableLedgers() {
+        return getAllDeletableLedgers(DELETABLE_LEDGER_MARKER_PREFIX);
+    }
+
+    @Override
+    public Set<Long> getAllDeletableOffloadedLedgers() {
+        return 
getAllDeletableLedgers(DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX);
+    }
+
+    /**
+     * During the execution of this method, lock {@code metadataMutex} needs 
to be held
+     * because the {@code propertiesMap} would be updated (not thread-safe).
+     */
+    @Override
+    public void removeAllDeletableLedgers() {
+        Set<Long> deletableLedgers = getAllDeletableLedgers();
+        Set<Long> deletableOffloadedLedgers = 
getAllDeletableOffloadedLedgers();
+        final CountDownLatch counter = new 
CountDownLatch(deletableLedgers.size() + deletableOffloadedLedgers.size());
+
+        Set<Long> finishedDeletedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> finishedDeletedOffloadedLedgers = 
ConcurrentHashMap.newKeySet();
+        Set<Long> timeoutDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+        Set<Long> succeedDeletedLedgers = ConcurrentHashMap.newKeySet();
+        Set<Long> failDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+        Set<Long> succeedDeletedOffloadedLedgers = 
ConcurrentHashMap.newKeySet();
+        Set<Long> failDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+
+        for (Long deletableLedger : deletableLedgers) {
+            asyncDeleteLedger(deletableLedger, DEFAULT_LEDGER_DELETE_RETRIES,
+                    new DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            counter.countDown();
+                            finishedDeletedLedgers.add(deletableLedger);
+                            succeedDeletedLedgers.add(deletableLedger);
+                        }
+
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                            log.warn("[{}] Failed to delete bookkeeper 
ledger:{} due to",
+                                    name, deletableLedger, exception);
+                            counter.countDown();
+                            finishedDeletedLedgers.add(deletableLedger);
+                            failDeletedLedgers.add(deletableLedger);
+                        }
+                    }, 0, null);
+        }
+
+        for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
+            final String deletableOffloadedLedgerMarker =
+                    DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + 
deletableOffloadedLedger;
+
+            try {
+                final LedgerInfo deletableOffloadedLedgerInfo = 
LedgerInfo.parseFrom(
+                        
BaseEncoding.base64().decode(propertiesMap.get(deletableOffloadedLedgerMarker)));
+                asyncDeleteOffloadedLedger(deletableOffloadedLedger, 
deletableOffloadedLedgerInfo,
+                        DEFAULT_LEDGER_DELETE_RETRIES,
+                        new DeleteLedgerCallback() {
+                            @Override
+                            public void deleteLedgerComplete(Object ctx) {
+                                counter.countDown();
+                                
finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                                
succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                            }
+
+                            @Override
+                            public void 
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                log.warn("[{}] Failed to delete offloaded 
ledger:{} due to",
+                                        name, deletableOffloadedLedger, 
exception);
+                                counter.countDown();
+                                
finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                                
failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                            }
+                        });
+            } catch (Exception e) {
+                log.warn("[{}] Failed to retrieve offloaded ledger info of {} 
due to",
+                        name, deletableOffloadedLedger, e);
+                counter.countDown();
+                finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+                failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+            }
+        }
+
+        try {
+            if (!counter.await(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS)) {
+                for (Long deletableLedger : deletableLedgers) {
+                    if (!finishedDeletedLedgers.contains(deletableLedger)) {
+                        log.warn("[{}] Failed to delete ledger:{} due to 
operation timeout", name, deletableLedger);
+                        timeoutDeletedLedgers.add(deletableLedger);
+                    }
+                }
+                for (Long deletableOffloadedLedger : 
deletableOffloadedLedgers) {
+                    if 
(!finishedDeletedOffloadedLedgers.contains(deletableOffloadedLedger)) {
+                        log.warn("[{}] Failed to delete offloaded ledger:{} 
due to operation timeout",
+                                name, deletableOffloadedLedger);
+                        timeoutDeletedLedgers.add(deletableOffloadedLedger);
+                    }
+                }
+            }
+
+            // remove markers after deleting ledgers
+            for (Long ledgerId : succeedDeletedLedgers) {
+                final String deletableLedgerMarker = 
DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+                propertiesMap.remove(deletableLedgerMarker);
+            }
+            for (Long ledgerId : succeedDeletedOffloadedLedgers) {
+                final String deletableLedgerMarker = 
DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+                propertiesMap.remove(deletableLedgerMarker);
+            }
+
+            // update retry count to track whether the max limit is reached
+            Set<Long> allFailedLedgers = new HashSet<>();
+            allFailedLedgers.addAll(failDeletedLedgers);
+            allFailedLedgers.addAll(failDeletedOffloadedLedgers);
+            allFailedLedgers.addAll(timeoutDeletedLedgers);
+
+            if (allFailedLedgers.isEmpty()) {
+                log.info("[{}] ledgers: {} and offloaded ledgers: {} are 
deleted successfully.",
+                        name, deletableLedgers, deletableOffloadedLedgers);

Review comment:
       Those collection objects that inherit `java.util.AbstractCollection` 
have the ability to return a string representation of this collection 
(`java.util.AbstractCollection#toString`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to