This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 49edc3d9ba8 [improve][broker] Don't rollover empty ledgers based on 
inactivity (#21893)
49edc3d9ba8 is described below

commit 49edc3d9ba8abf7ba4169653a8093e2f866d7f78
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Jan 14 23:28:43 2024 -0800

    [improve][broker] Don't rollover empty ledgers based on inactivity (#21893)
    
    ### Motivation
    
    When `managedLedgerInactiveLedgerRolloverTimeSeconds` is set, let's say to 
`300` (5 minutes), the ledger will also get rolled in the case when no new 
entries (messages) were added to the ledger. This doesn't make sense.
    Empty ledgers are deleted, but having this extra churn is causing extra 
load on brokers, bookies, and metadata stores (zookeeper).
    
    ### Modifications
    
    Skip rolling the ledger if it is empty.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 ++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 25 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a6b196bda47..948eacce72d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4459,7 +4459,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     @Override
     public boolean checkInactiveLedgerAndRollOver() {
         long currentTimeMs = System.currentTimeMillis();
-        if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > 
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
+        if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 && 
currentTimeMs > (lastAddEntryTimeMs
+                + inactiveLedgerRollOverTimeMs)) {
             log.info("[{}] Closing inactive ledger, last-add entry {}", name, 
lastAddEntryTimeMs);
             if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, 
State.ClosingLedger)) {
                 LedgerHandle currentLedger = this.currentLedger;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8430afb4e4f..4c92911c687 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3898,6 +3898,30 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testDontRollOverEmptyInactiveLedgers() throws Exception {
+        int inactiveLedgerRollOverTimeMs = 5;
+        ManagedLedgerFactoryConfig factoryConf = new 
ManagedLedgerFactoryConfig();
+        @Cleanup("shutdown")
+        ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, 
TimeUnit.MILLISECONDS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("rollover_inactive", config);
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        long ledgerId = ledger.currentLedger.getId();
+
+        Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+        ledger.checkInactiveLedgerAndRollOver();
+
+        Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+        ledger.checkInactiveLedgerAndRollOver();
+
+        assertEquals(ledger.currentLedger.getId(), ledgerId);
+
+        ledger.close();
+    }
+
     @Test
     public void testOffloadTaskCancelled() throws Exception {
         @Cleanup("shutdown")
@@ -4094,6 +4118,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setInactiveLedgerRollOverTime(10, TimeUnit.MILLISECONDS);
         ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, 
config);
+        ml.addEntry("entry".getBytes(UTF_8));
 
         MutableBoolean isRolledOver = new MutableBoolean(false);
         retryStrategically((test) -> {

Reply via email to