This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 94b5f4c1243b84fe6beb642078ef298f2085d717 Author: hangc0276 <[email protected]> AuthorDate: Fri Jul 9 12:16:13 2021 +0800 simplify managedLedger retention trim logic (#11255) ### Motivation Current managedLedger retention trim logic is duplicate, just simplify to retention check logic. Current tests has cover the retention logic. ### Modification 1. simplify the retention check logic. (cherry picked from commit 4348f83b59431572137c0508adcaf493092e71e1) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 59 +++++++--------------- 1 file changed, 19 insertions(+), 40 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 021a7dd..1491239 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2289,25 +2289,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } private boolean hasLedgerRetentionExpired(long ledgerTimestamp) { - if (config.getRetentionTimeMillis() < 0) { - // Negative retention time equates to infinite retention - return false; - } - - long elapsedMs = clock.millis() - ledgerTimestamp; - return elapsedMs > config.getRetentionTimeMillis(); - } - - private boolean isLedgerRetentionOverSizeQuota() { - // Handle the -1 size limit as "infinite" size quota - return config.getRetentionSizeInMB() >= 0 - && TOTAL_SIZE_UPDATER.get(this) > config.getRetentionSizeInMB() * 1024 * 1024; + return config.getRetentionTimeMillis() >= 0 + && clock.millis() - ledgerTimestamp > config.getRetentionTimeMillis(); } - private boolean isLedgerRetentionOverSizeQuotaAfterDelete(long sizeToDelete) { + private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) { // Handle the -1 size limit as "infinite" size quota return config.getRetentionSizeInMB() >= 0 - && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete > config.getRetentionSizeInMB() * 1024 * 1024; + && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte; } private boolean isOffloadedNeedsDelete(OffloadContext offload) { @@ -2376,7 +2365,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId); } - long totalSizeToDelete = 0; boolean retentionSizeQuotaMet = false; // skip ledger if retention constraint met @@ -2392,40 +2380,31 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // if truncate, all ledgers besides currentLedger are going to be deleted if (isTruncate){ if (log.isDebugEnabled()) { - log.debug("[{}] Ledger {} has been truncated with ts {}", name, ls.getLedgerId(), ls.getTimestamp()); + log.debug("[{}] Ledger {} will be truncated with ts {}", + name, ls.getLedgerId(), ls.getTimestamp()); } ledgersToDelete.add(ls); continue; } - // if neither currentLedger nor truncate - if (!retentionSizeQuotaMet) { - totalSizeToDelete += ls.getSize(); - boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(); - boolean overRetentionQuotaAfterDelete = isLedgerRetentionOverSizeQuotaAfterDelete(totalSizeToDelete); - if (overRetentionQuota) { - if (overRetentionQuotaAfterDelete) { - if (log.isDebugEnabled()) { - log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId()); - } - ledgersToDelete.add(ls); - continue; - } else { - retentionSizeQuotaMet = true; - ledgersToDelete.add(ls); - continue; - } - } else { - retentionSizeQuotaMet = true; - } + totalSizeToDelete += ls.getSize(); + boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(totalSizeToDelete); + boolean expired = hasLedgerRetentionExpired(ls.getTimestamp()); + if (log.isDebugEnabled()) { + log.debug( + "[{}] Checking ledger {} -- time-old: {} sec -- " + + "expired: {} -- over-quota: {} -- current-ledger: {}", + name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired, + overRetentionQuota, currentLedger.getId()); } - if (hasLedgerRetentionExpired(ls.getTimestamp())) { + if (expired || overRetentionQuota) { if (log.isDebugEnabled()) { - log.debug("[{}] Ledger {} has expired, expired is {}, ts {}", name, ls.getLedgerId(), ls.getTimestamp()); + log.debug("[{}] Ledger {} has expired or over quota, expired is: {}, ts: {}, " + + "overRetentionQuota is: {}, ledge size: {}", + name, ls.getLedgerId(), expired, ls.getTimestamp(), overRetentionQuota, ls.getSize()); } ledgersToDelete.add(ls); - continue; } else { // once retention constraint has been met, skip check if (log.isDebugEnabled()) {
