This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3ad707b40dcfb5240322c12e229375d53ab9001f Author: Aloys <[email protected]> AuthorDate: Thu Jul 8 12:21:24 2021 +0800 Fix retention size policy delete too much ledgers (#11242) Fixes #11241 ### Motivation This pull request fix the error that retention size policy delete too much ledgers when all cursors has no backlog ### Modifications 1. generate `ledgersToDelete` by checking ledger size one by one instead of `TOTAL_SIZE_UPDATER` 2. refactor the logic of generate `ledgersToDelete` 3. add test code (cherry picked from commit 8ca6f193afa5194ef49130ff1a7b871ecb00c828) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 73 +++++++++++++++++----- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 44 +++++++++++++ 2 files changed, 100 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9a5cea0..021a7dd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2304,6 +2304,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { && TOTAL_SIZE_UPDATER.get(this) > config.getRetentionSizeInMB() * 1024 * 1024; } + private boolean isLedgerRetentionOverSizeQuotaAfterDelete(long sizeToDelete) { + // Handle the -1 size limit as "infinite" size quota + return config.getRetentionSizeInMB() >= 0 + && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete > config.getRetentionSizeInMB() * 1024 * 1024; + } + private boolean isOffloadedNeedsDelete(OffloadContext offload) { long elapsedMs = clock.millis() - offload.getTimestamp(); @@ -2369,33 +2375,66 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (log.isDebugEnabled()) { log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId); } + + + long totalSizeToDelete = 0; + boolean retentionSizeQuotaMet = false; // skip ledger if retention constraint met for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) { - boolean expired = hasLedgerRetentionExpired(ls.getTimestamp()); - boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(); - - if (log.isDebugEnabled()) { - log.debug( - "[{}] Checking ledger {} -- time-old: {} sec -- " - + "expired: {} -- over-quota: {} -- current-ledger: {}", - name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired, - overRetentionQuota, currentLedger.getId()); - } + // currentLedger can not be deleted if (ls.getLedgerId() == currentLedger.getId()) { - log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name, - ls.getLedgerId()); + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name, + ls.getLedgerId()); + } break; - } else if (expired || isTruncate) { - log.debug("[{}] Ledger {} has expired or be truncated, expired is {}, isTruncate is {}, ts {}", name, ls.getLedgerId(), expired, isTruncate, ls.getTimestamp()); + } + // if truncate, all ledgers besides currentLedger are going to be deleted + if (isTruncate){ + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} has been truncated with ts {}", name, ls.getLedgerId(), ls.getTimestamp()); + } ledgersToDelete.add(ls); - } else if (overRetentionQuota || isTruncate) { - log.debug("[{}] Ledger {} is over quota or be truncated, overRetentionQuota is {}, isTruncate is {}", name, ls.getLedgerId(), overRetentionQuota, isTruncate); + continue; + } + + // if neither currentLedger nor truncate + if (!retentionSizeQuotaMet) { + totalSizeToDelete += ls.getSize(); + boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(); + boolean overRetentionQuotaAfterDelete = isLedgerRetentionOverSizeQuotaAfterDelete(totalSizeToDelete); + if (overRetentionQuota) { + if (overRetentionQuotaAfterDelete) { + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId()); + } + ledgersToDelete.add(ls); + continue; + } else { + retentionSizeQuotaMet = true; + ledgersToDelete.add(ls); + continue; + } + } else { + retentionSizeQuotaMet = true; + } + } + + if (hasLedgerRetentionExpired(ls.getTimestamp())) { + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} has expired, expired is {}, ts {}", name, ls.getLedgerId(), ls.getTimestamp()); + } ledgersToDelete.add(ls); + continue; } else { - log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId()); + // once retention constraint has been met, skip check + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId()); + } break; } } + for (LedgerInfo ls : ledgers.values()) { if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) { log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name, diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d55598c..1b31b5a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -40,6 +40,7 @@ import java.nio.ReadOnlyBufferException; import java.nio.charset.Charset; import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -1967,6 +1968,49 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { } @Test + public void testRetentionSize() throws Exception { + final int retentionSizeInMB = 5; + final int totalMessage = 10; + + // message size is 1MB + final int messageSize = 1048576; + char[] data = new char[messageSize]; + Arrays.fill(data, 'a'); + byte [] message = new String(data).getBytes(Encoding); + + @Cleanup("shutdown") + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(retentionSizeInMB); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(1, TimeUnit.HOURS); + + + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("retention_size_ledger", config); + ManagedCursor c1 = ml.openCursor("c1"); + Position position = null; + for (int i = 0; i < totalMessage; i++) { + position = ml.addEntry(message); + } + // all ledgers are not delete yet since no entry has been acked for c1 + assertEquals(ml.getLedgersInfoAsList().size(), totalMessage); + + List<Entry> entryList = c1.readEntries(totalMessage); + if (null != position) { + c1.markDelete(position); + } + entryList.forEach(entry -> { + log.info("Read entry position {}:{}", entry.getLedgerId(), entry.getEntryId()); + entry.release(); + }); + + Awaitility.await().untilAsserted(() -> { + assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024); + assertEquals(ml.getLedgersInfoAsList().size(), 5); + }); + } + + @Test public void testTimestampOnWorkingLedger() throws Exception { ManagedLedgerConfig conf = new ManagedLedgerConfig(); conf.setMaxEntriesPerLedger(1);
