wuzhanpeng commented on a change in pull request #13575:
URL: https://github.com/apache/pulsar/pull/13575#discussion_r792360487
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -4039,6 +4122,190 @@ public void clearBacklogFailed(ManagedLedgerException
exception, Object ctx) {
return statFuture;
}
+ /**
+ * During the execution of this method, lock {@code metadataMutex} needs
to be held
+ * because the {@code propertiesMap} would be updated (not thread-safe).
+ * @param deletableLedgerIds
+ */
+ @Override
+ public void markDeletableLedgers(Collection<Long> deletableLedgerIds,
+ Collection<Long>
deletableOffloadedLedgerIds) {
+ for (Long ledgerId : deletableLedgerIds) {
+ final String deletableLedgerMarker =
DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+ propertiesMap.put(deletableLedgerMarker,
DELETABLE_LEDGER_PLACEHOLDER);
+ }
+ for (Long ledgerId : deletableOffloadedLedgerIds) {
+ final String deletableOffloadedLedgerMarker =
DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+ // Offload context info is required in ledger cleanup, therefore
the serialized info object
+ // is kept in the propertiesMap until the ledger deletion is done
+ final String offloadedLedgerInfo =
BaseEncoding.base64().encode(ledgers.get(ledgerId).toByteArray());
+ propertiesMap.put(deletableOffloadedLedgerMarker,
offloadedLedgerInfo);
+ }
+ }
+
+ private Set<Long> getAllDeletableLedgers(String prefix) {
+ Set<Long> deletableLedgers = propertiesMap.keySet().stream()
+ .filter(k -> k.startsWith(prefix))
+ .map(k -> {
+ Long ledgerId =
Long.parseLong(k.substring(prefix.length()));
+ if (deletableLedgerRetryCounter.containsKey(ledgerId)
+ && deletableLedgerRetryCounter.get(ledgerId).get()
>= DEFAULT_LEDGER_DELETE_RETRIES) {
+ log.error("[{}] Cannot delete ledger:{} after {}
reties and now stop retrying on this broker",
+ name, ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+ return null;
+ }
+ return ledgerId;
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ if (!deletableLedgers.isEmpty()) {
+ return deletableLedgers;
+ }
+ return Sets.newHashSet();
+ }
+
+ @Override
+ public Set<Long> getAllDeletableLedgers() {
+ return getAllDeletableLedgers(DELETABLE_LEDGER_MARKER_PREFIX);
+ }
+
+ @Override
+ public Set<Long> getAllDeletableOffloadedLedgers() {
+ return
getAllDeletableLedgers(DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX);
+ }
+
+ /**
+ * During the execution of this method, lock {@code metadataMutex} needs
to be held
+ * because the {@code propertiesMap} would be updated (not thread-safe).
+ */
+ @Override
+ public void removeAllDeletableLedgers() {
+ Set<Long> deletableLedgers = getAllDeletableLedgers();
+ Set<Long> deletableOffloadedLedgers =
getAllDeletableOffloadedLedgers();
+ final CountDownLatch counter = new
CountDownLatch(deletableLedgers.size() + deletableOffloadedLedgers.size());
+
+ Set<Long> finishedDeletedLedgers = ConcurrentHashMap.newKeySet();
+ Set<Long> finishedDeletedOffloadedLedgers =
ConcurrentHashMap.newKeySet();
+ Set<Long> timeoutDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+ Set<Long> succeedDeletedLedgers = ConcurrentHashMap.newKeySet();
+ Set<Long> failDeletedLedgers = ConcurrentHashMap.newKeySet();
+
+ Set<Long> succeedDeletedOffloadedLedgers =
ConcurrentHashMap.newKeySet();
+ Set<Long> failDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet();
+
+ for (Long deletableLedger : deletableLedgers) {
+ asyncDeleteLedger(deletableLedger, DEFAULT_LEDGER_DELETE_RETRIES,
+ new DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ counter.countDown();
+ finishedDeletedLedgers.add(deletableLedger);
+ succeedDeletedLedgers.add(deletableLedger);
+ }
+
+ @Override
+ public void deleteLedgerFailed(ManagedLedgerException
exception, Object ctx) {
+ log.warn("[{}] Failed to delete bookkeeper
ledger:{} due to",
+ name, deletableLedger, exception);
+ counter.countDown();
+ finishedDeletedLedgers.add(deletableLedger);
+ failDeletedLedgers.add(deletableLedger);
+ }
+ }, 0, null);
+ }
+
+ for (Long deletableOffloadedLedger : deletableOffloadedLedgers) {
+ final String deletableOffloadedLedgerMarker =
+ DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX +
deletableOffloadedLedger;
+
+ try {
+ final LedgerInfo deletableOffloadedLedgerInfo =
LedgerInfo.parseFrom(
+
BaseEncoding.base64().decode(propertiesMap.get(deletableOffloadedLedgerMarker)));
+ asyncDeleteOffloadedLedger(deletableOffloadedLedger,
deletableOffloadedLedgerInfo,
+ DEFAULT_LEDGER_DELETE_RETRIES,
+ new DeleteLedgerCallback() {
+ @Override
+ public void deleteLedgerComplete(Object ctx) {
+ counter.countDown();
+
finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+
succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+ }
+
+ @Override
+ public void
deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}] Failed to delete offloaded
ledger:{} due to",
+ name, deletableOffloadedLedger,
exception);
+ counter.countDown();
+
finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+
failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+ }
+ });
+ } catch (Exception e) {
+ log.warn("[{}] Failed to retrieve offloaded ledger info of {}
due to",
+ name, deletableOffloadedLedger, e);
+ counter.countDown();
+ finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+ failDeletedOffloadedLedgers.add(deletableOffloadedLedger);
+ }
+ }
+
+ try {
+ if (!counter.await(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS)) {
+ for (Long deletableLedger : deletableLedgers) {
+ if (!finishedDeletedLedgers.contains(deletableLedger)) {
+ log.warn("[{}] Failed to delete ledger:{} due to
operation timeout", name, deletableLedger);
+ timeoutDeletedLedgers.add(deletableLedger);
+ }
+ }
+ for (Long deletableOffloadedLedger :
deletableOffloadedLedgers) {
+ if
(!finishedDeletedOffloadedLedgers.contains(deletableOffloadedLedger)) {
+ log.warn("[{}] Failed to delete offloaded ledger:{}
due to operation timeout",
+ name, deletableOffloadedLedger);
+ timeoutDeletedLedgers.add(deletableOffloadedLedger);
+ }
+ }
+ }
+
+ // remove markers after deleting ledgers
+ for (Long ledgerId : succeedDeletedLedgers) {
+ final String deletableLedgerMarker =
DELETABLE_LEDGER_MARKER_PREFIX + ledgerId;
+ propertiesMap.remove(deletableLedgerMarker);
+ }
+ for (Long ledgerId : succeedDeletedOffloadedLedgers) {
+ final String deletableLedgerMarker =
DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId;
+ propertiesMap.remove(deletableLedgerMarker);
+ }
+
+ // update retry count to track whether the max limit is reached
+ Set<Long> allFailedLedgers = new HashSet<>();
+ allFailedLedgers.addAll(failDeletedLedgers);
+ allFailedLedgers.addAll(failDeletedOffloadedLedgers);
+ allFailedLedgers.addAll(timeoutDeletedLedgers);
+
+ if (allFailedLedgers.isEmpty()) {
+ log.info("[{}] ledgers: {} and offloaded ledgers: {} are
deleted successfully.",
+ name, deletableLedgers, deletableOffloadedLedgers);
Review comment:
Those collection objects that inherit `java.util.AbstractCollection`
have the ability to return a string representation of this collection
(`java.util.AbstractCollection#toString`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]