This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8ca6f19 Fix retention size policy delete too much ledgers (#11242)
8ca6f19 is described below
commit 8ca6f193afa5194ef49130ff1a7b871ecb00c828
Author: Aloys <[email protected]>
AuthorDate: Thu Jul 8 12:21:24 2021 +0800
Fix retention size policy delete too much ledgers (#11242)
Fixes #11241
### Motivation
This pull request fix the error that retention size policy delete too much
ledgers when all cursors has no backlog
### Modifications
1. generate `ledgersToDelete` by checking ledger size one by one instead
of `TOTAL_SIZE_UPDATER`
2. refactor the logic of generate `ledgersToDelete`
3. add test code
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 73 +++++++++++++++++-----
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 44 +++++++++++++
2 files changed, 100 insertions(+), 17 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e3148b9..48c69c8 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2304,6 +2304,12 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
&& TOTAL_SIZE_UPDATER.get(this) >
config.getRetentionSizeInMB() * 1024 * 1024;
}
+ private boolean isLedgerRetentionOverSizeQuotaAfterDelete(long
sizeToDelete) {
+ // Handle the -1 size limit as "infinite" size quota
+ return config.getRetentionSizeInMB() >= 0
+ && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >
config.getRetentionSizeInMB() * 1024 * 1024;
+ }
+
private boolean isOffloadedNeedsDelete(OffloadContext offload) {
long elapsedMs = clock.millis() - offload.getTimestamp();
@@ -2369,33 +2375,66 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Slowest consumer ledger id: {}", name,
slowestReaderLedgerId);
}
+
+
+ long totalSizeToDelete = 0;
+ boolean retentionSizeQuotaMet = false;
// skip ledger if retention constraint met
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId,
false).values()) {
- boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
- boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
-
- if (log.isDebugEnabled()) {
- log.debug(
- "[{}] Checking ledger {} -- time-old: {} sec -- "
- + "expired: {} -- over-quota: {} --
current-ledger: {}",
- name, ls.getLedgerId(), (clock.millis() -
ls.getTimestamp()) / 1000.0, expired,
- overRetentionQuota, currentLedger.getId());
- }
+ // currentLedger can not be deleted
if (ls.getLedgerId() == currentLedger.getId()) {
- log.debug("[{}] Ledger {} skipped for deletion as it is
currently being written to", name,
- ls.getLedgerId());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger {} skipped for deletion as it
is currently being written to", name,
+ ls.getLedgerId());
+ }
break;
- } else if (expired || isTruncate) {
- log.debug("[{}] Ledger {} has expired or be truncated,
expired is {}, isTruncate is {}, ts {}", name, ls.getLedgerId(), expired,
isTruncate, ls.getTimestamp());
+ }
+ // if truncate, all ledgers besides currentLedger are going to
be deleted
+ if (isTruncate){
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger {} has been truncated with ts
{}", name, ls.getLedgerId(), ls.getTimestamp());
+ }
ledgersToDelete.add(ls);
- } else if (overRetentionQuota || isTruncate) {
- log.debug("[{}] Ledger {} is over quota or be truncated,
overRetentionQuota is {}, isTruncate is {}", name, ls.getLedgerId(),
overRetentionQuota, isTruncate);
+ continue;
+ }
+
+ // if neither currentLedger nor truncate
+ if (!retentionSizeQuotaMet) {
+ totalSizeToDelete += ls.getSize();
+ boolean overRetentionQuota =
isLedgerRetentionOverSizeQuota();
+ boolean overRetentionQuotaAfterDelete =
isLedgerRetentionOverSizeQuotaAfterDelete(totalSizeToDelete);
+ if (overRetentionQuota) {
+ if (overRetentionQuotaAfterDelete) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger {} is over quota",
name, ls.getLedgerId());
+ }
+ ledgersToDelete.add(ls);
+ continue;
+ } else {
+ retentionSizeQuotaMet = true;
+ ledgersToDelete.add(ls);
+ continue;
+ }
+ } else {
+ retentionSizeQuotaMet = true;
+ }
+ }
+
+ if (hasLedgerRetentionExpired(ls.getTimestamp())) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger {} has expired, expired is {},
ts {}", name, ls.getLedgerId(), ls.getTimestamp());
+ }
ledgersToDelete.add(ls);
+ continue;
} else {
- log.debug("[{}] Ledger {} not deleted. Neither expired nor
over-quota", name, ls.getLedgerId());
+ // once retention constraint has been met, skip check
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger {} not deleted. Neither expired
nor over-quota", name, ls.getLedgerId());
+ }
break;
}
}
+
for (LedgerInfo ls : ledgers.values()) {
if (isOffloadedNeedsDelete(ls.getOffloadContext()) &&
!ledgersToDelete.contains(ls)) {
log.debug("[{}] Ledger {} has been offloaded, bookkeeper
ledger needs to be deleted", name,
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 62b44b8..25c1725 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -44,6 +44,7 @@ import java.nio.ReadOnlyBufferException;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -1971,6 +1972,49 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
}
@Test
+ public void testRetentionSize() throws Exception {
+ final int retentionSizeInMB = 5;
+ final int totalMessage = 10;
+
+ // message size is 1MB
+ final int messageSize = 1048576;
+ char[] data = new char[messageSize];
+ Arrays.fill(data, 'a');
+ byte [] message = new String(data).getBytes(Encoding);
+
+ @Cleanup("shutdown")
+ ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionSizeInMB(retentionSizeInMB);
+ config.setMaxEntriesPerLedger(1);
+ config.setRetentionTime(1, TimeUnit.HOURS);
+
+
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
factory.open("retention_size_ledger", config);
+ ManagedCursor c1 = ml.openCursor("c1");
+ Position position = null;
+ for (int i = 0; i < totalMessage; i++) {
+ position = ml.addEntry(message);
+ }
+ // all ledgers are not delete yet since no entry has been acked for c1
+ assertEquals(ml.getLedgersInfoAsList().size(), totalMessage);
+
+ List<Entry> entryList = c1.readEntries(totalMessage);
+ if (null != position) {
+ c1.markDelete(position);
+ }
+ entryList.forEach(entry -> {
+ log.info("Read entry position {}:{}", entry.getLedgerId(),
entry.getEntryId());
+ entry.release();
+ });
+
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024);
+ assertEquals(ml.getLedgersInfoAsList().size(), 5);
+ });
+ }
+
+ @Test
public void testTimestampOnWorkingLedger() throws Exception {
ManagedLedgerConfig conf = new ManagedLedgerConfig();
conf.setMaxEntriesPerLedger(1);