This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 84a488535b2 [improve][broker] Don't rollover empty ledgers based on 
inactivity (#21893)
84a488535b2 is described below

commit 84a488535b2dc9efbb68b4380889259fd0409f0a
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Jan 14 23:28:43 2024 -0800

    [improve][broker] Don't rollover empty ledgers based on inactivity (#21893)
    
    ### Motivation
    
    When `managedLedgerInactiveLedgerRolloverTimeSeconds` is set, let's say to 
`300` (5 minutes), the ledger will also get rolled in the case when no new 
entries (messages) were added to the ledger. This doesn't make sense.
    Empty ledgers are deleted, but having this extra churn is causing extra 
load on brokers, bookies, and metadata stores (zookeeper).
    
    ### Modifications
    
    Skip rolling the ledger if it is empty.
    
    (cherry picked from commit 49edc3d9ba8abf7ba4169653a8093e2f866d7f78)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 25 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 82db81f22d2..64c85f21fc6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4405,7 +4405,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     @Override
     public boolean checkInactiveLedgerAndRollOver() {
         long currentTimeMs = System.currentTimeMillis();
-        if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > 
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
+        if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 && 
currentTimeMs > (lastAddEntryTimeMs
+                + inactiveLedgerRollOverTimeMs)) {
             log.info("[{}] Closing inactive ledger, last-add entry {}", name, 
lastAddEntryTimeMs);
             if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, 
State.ClosingLedger)) {
                 LedgerHandle currentLedger = this.currentLedger;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index b990e434df3..6c4f21c3af2 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3898,6 +3898,30 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         factory.shutdown();
     }
 
+    @Test
+    public void testDontRollOverEmptyInactiveLedgers() throws Exception {
+        int inactiveLedgerRollOverTimeMs = 5;
+        ManagedLedgerFactoryConfig factoryConf = new 
ManagedLedgerFactoryConfig();
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, 
TimeUnit.MILLISECONDS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("rollover_inactive", config);
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        long ledgerId = ledger.currentLedger.getId();
+
+        Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+        ledger.checkInactiveLedgerAndRollOver();
+
+        Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+        ledger.checkInactiveLedgerAndRollOver();
+
+        assertEquals(ledger.currentLedger.getId(), ledgerId);
+
+        ledger.close();
+    }
+
     @Test
     public void testOffloadTaskCancelled() throws Exception {
         ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
@@ -4093,6 +4117,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setInactiveLedgerRollOverTime(10, TimeUnit.MILLISECONDS);
         ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, 
config);
+        ml.addEntry("entry".getBytes(UTF_8));
 
         MutableBoolean isRolledOver = new MutableBoolean(false);
         retryStrategically((test) -> {

Reply via email to