This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ca6f19  Fix retention size policy delete too much ledgers (#11242)
8ca6f19 is described below

commit 8ca6f193afa5194ef49130ff1a7b871ecb00c828
Author: Aloys <[email protected]>
AuthorDate: Thu Jul 8 12:21:24 2021 +0800

    Fix retention size policy delete too much ledgers (#11242)
    
    Fixes #11241
    ### Motivation
    This pull request fix the error that retention size policy delete too much 
ledgers when all cursors has no backlog
    
    ### Modifications
    
    1.  generate `ledgersToDelete` by checking ledger size one by one instead 
of `TOTAL_SIZE_UPDATER`
    2. refactor the logic of generate `ledgersToDelete`
    3. add test code
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 73 +++++++++++++++++-----
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 44 +++++++++++++
 2 files changed, 100 insertions(+), 17 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e3148b9..48c69c8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2304,6 +2304,12 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 && TOTAL_SIZE_UPDATER.get(this) > 
config.getRetentionSizeInMB() * 1024 * 1024;
     }
 
+    private boolean isLedgerRetentionOverSizeQuotaAfterDelete(long 
sizeToDelete) {
+        // Handle the -1 size limit as "infinite" size quota
+        return config.getRetentionSizeInMB() >= 0
+                &&  TOTAL_SIZE_UPDATER.get(this) - sizeToDelete > 
config.getRetentionSizeInMB() * 1024 * 1024;
+    }
+
     private boolean isOffloadedNeedsDelete(OffloadContext offload) {
         long elapsedMs = clock.millis() - offload.getTimestamp();
 
@@ -2369,33 +2375,66 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Slowest consumer ledger id: {}", name, 
slowestReaderLedgerId);
             }
+
+
+            long totalSizeToDelete = 0;
+            boolean retentionSizeQuotaMet = false;
             // skip ledger if retention constraint met
             for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, 
false).values()) {
-                boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
-                boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(
-                            "[{}] Checking ledger {} -- time-old: {} sec -- "
-                                    + "expired: {} -- over-quota: {} -- 
current-ledger: {}",
-                            name, ls.getLedgerId(), (clock.millis() - 
ls.getTimestamp()) / 1000.0, expired,
-                            overRetentionQuota, currentLedger.getId());
-                }
+                // currentLedger can not be deleted
                 if (ls.getLedgerId() == currentLedger.getId()) {
-                    log.debug("[{}] Ledger {} skipped for deletion as it is 
currently being written to", name,
-                            ls.getLedgerId());
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Ledger {} skipped for deletion as it 
is currently being written to", name,
+                                ls.getLedgerId());
+                    }
                     break;
-                } else if (expired || isTruncate) {
-                    log.debug("[{}] Ledger {} has expired or be truncated, 
expired is {}, isTruncate is {}, ts {}", name, ls.getLedgerId(), expired,  
isTruncate, ls.getTimestamp());
+                }
+                // if truncate, all ledgers besides currentLedger are going to 
be deleted
+                if (isTruncate){
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Ledger {} has been truncated with ts 
{}", name, ls.getLedgerId(), ls.getTimestamp());
+                    }
                     ledgersToDelete.add(ls);
-                } else if (overRetentionQuota || isTruncate) {
-                    log.debug("[{}] Ledger {} is over quota or be truncated, 
overRetentionQuota is {}, isTruncate is {}", name, ls.getLedgerId(), 
overRetentionQuota, isTruncate);
+                    continue;
+                }
+
+                // if neither currentLedger nor truncate
+                if (!retentionSizeQuotaMet) {
+                    totalSizeToDelete += ls.getSize();
+                    boolean overRetentionQuota = 
isLedgerRetentionOverSizeQuota();
+                    boolean overRetentionQuotaAfterDelete = 
isLedgerRetentionOverSizeQuotaAfterDelete(totalSizeToDelete);
+                    if (overRetentionQuota) {
+                        if (overRetentionQuotaAfterDelete) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Ledger {} is over quota", 
name, ls.getLedgerId());
+                            }
+                            ledgersToDelete.add(ls);
+                            continue;
+                        } else {
+                            retentionSizeQuotaMet = true;
+                            ledgersToDelete.add(ls);
+                            continue;
+                        }
+                    } else {
+                        retentionSizeQuotaMet = true;
+                    }
+                }
+
+                if (hasLedgerRetentionExpired(ls.getTimestamp())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Ledger {} has expired, expired is {}, 
ts {}", name, ls.getLedgerId(), ls.getTimestamp());
+                    }
                     ledgersToDelete.add(ls);
+                    continue;
                 } else {
-                    log.debug("[{}] Ledger {} not deleted. Neither expired nor 
over-quota", name, ls.getLedgerId());
+                    // once retention constraint has been met, skip check
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Ledger {} not deleted. Neither expired 
nor over-quota", name, ls.getLedgerId());
+                    }
                     break;
                 }
             }
+
             for (LedgerInfo ls : ledgers.values()) {
                 if (isOffloadedNeedsDelete(ls.getOffloadContext()) && 
!ledgersToDelete.contains(ls)) {
                     log.debug("[{}] Ledger {} has been offloaded, bookkeeper 
ledger needs to be deleted", name,
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 62b44b8..25c1725 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -44,6 +44,7 @@ import java.nio.ReadOnlyBufferException;
 import java.nio.charset.Charset;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -1971,6 +1972,49 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testRetentionSize() throws Exception {
+        final int retentionSizeInMB = 5;
+        final int totalMessage = 10;
+
+        // message size is 1MB
+        final int messageSize = 1048576;
+        char[] data = new char[messageSize];
+        Arrays.fill(data, 'a');
+        byte [] message = new String(data).getBytes(Encoding);
+
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(retentionSizeInMB);
+        config.setMaxEntriesPerLedger(1);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
factory.open("retention_size_ledger", config);
+        ManagedCursor c1 = ml.openCursor("c1");
+        Position position = null;
+        for (int i = 0; i < totalMessage; i++) {
+            position = ml.addEntry(message);
+        }
+        // all ledgers are not delete yet since no entry has been acked for c1
+        assertEquals(ml.getLedgersInfoAsList().size(), totalMessage);
+
+        List<Entry> entryList = c1.readEntries(totalMessage);
+        if (null != position) {
+            c1.markDelete(position);
+        }
+        entryList.forEach(entry -> {
+            log.info("Read entry position {}:{}", entry.getLedgerId(), 
entry.getEntryId());
+            entry.release();
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024);
+            assertEquals(ml.getLedgersInfoAsList().size(), 5);
+        });
+    }
+
+    @Test
     public void testTimestampOnWorkingLedger() throws Exception {
         ManagedLedgerConfig conf = new ManagedLedgerConfig();
         conf.setMaxEntriesPerLedger(1);

Reply via email to