This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 936629cc684b1d45ebc47d3066f67bf7dbf9a067
Author: ran <[email protected]>
AuthorDate: Sat Jul 3 01:40:37 2021 +0800

    [Ledger] Fix ledger rollover scheduled task (#11116)
    
    ### Motivation
    
    Currently, the ledger rollover scheduled task will execute before reach the 
ledger maximum rollover time, this will cause the ledger doesn't roll over in 
time.
    
    ### Modifications
    
    Only make the ledger rollover scheduled task after the ledger created 
successfully.
    If the scheduled task was executed when there is no entry in the current 
ledger, the scheduled task will not be re-executed, and if there is new entry 
is added the ledger will rollover.
    
    ### Verifying this change
    
    Add a unit test to verify the ledger could be rolled over in time.
    
    (cherry picked from commit a092ceb63588c94ad12327464e65b5fb69d1569e)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 31 ++++++++++++----------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 17 ++++++++++++
 .../service/CurrentLedgerRolloverIfFullTest.java   |  2 +-
 3 files changed, 35 insertions(+), 15 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7677c71..9a5cea0 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -401,8 +401,6 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         });
 
         scheduleTimeoutTask();
-
-        scheduleRollOverLedgerTask();
     }
 
     private synchronized void initializeBookKeeper(final 
ManagedLedgerInitializeLedgerCallback callback) {
@@ -467,7 +465,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
                 log.info("[{}] Created ledger {}", name, lh.getId());
                 STATE_UPDATER.set(this, State.LedgerOpened);
-                lastLedgerCreatedTimestamp = clock.millis();
+                updateLastLedgerCreatedTimeAndScheduleRolloverTask();
                 currentLedger = lh;
 
                 lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
@@ -1488,7 +1486,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     public synchronized void updateLedgersIdsComplete(Stat stat) {
         STATE_UPDATER.set(this, State.LedgerOpened);
-        lastLedgerCreatedTimestamp = clock.millis();
+        updateLastLedgerCreatedTimeAndScheduleRolloverTask();
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Resending {} pending messages", name, 
pendingAddEntries.size());
@@ -3361,7 +3359,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * 
MegaByte));
 
         long timeSinceLedgerCreationMs = clock.millis() - 
lastLedgerCreatedTimestamp;
-        boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= 
maximumRolloverTimeMs;
+        boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= 
config.getMaximumRolloverTimeMs();
 
         if (spaceQuotaReached || maxLedgerTimeReached) {
             if (config.getMinimumRolloverTimeMs() > 0) {
@@ -3663,15 +3661,6 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
-    private void scheduleRollOverLedgerTask() {
-        if (config.getMaximumRolloverTimeMs() > 0) {
-            long interval = config.getMaximumRolloverTimeMs();
-            this.checkLedgerRollTask = 
this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
-                rollCurrentLedgerIfFull();
-            }), interval, interval, TimeUnit.MILLISECONDS);
-        }
-    }
-
     private void checkAddTimeout() {
         long timeoutSec = config.getAddEntryTimeoutSeconds();
         if (timeoutSec < 1) {
@@ -3929,4 +3918,18 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             return CompletableFuture.completedFuture(ensembles);
         });
     }
+
+    private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
+        this.lastLedgerCreatedTimestamp = clock.millis();
+        if (config.getMaximumRolloverTimeMs() > 0) {
+            if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) {
+                // new ledger has been created successfully
+                // and the previous checkLedgerRollTask is not done, we could 
cancel it
+                checkLedgerRollTask.cancel(true);
+            }
+            this.checkLedgerRollTask = this.scheduledExecutor.schedule(
+                    safeRun(this::rollCurrentLedgerIfFull), 
getMaximumRolloverTimeMs(config), TimeUnit.MILLISECONDS);
+        }
+    }
+
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e0b15fe..d55598c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2923,6 +2923,23 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testLedgerReachMaximumRolloverTime() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
+
+        ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", 
config);
+        long firstLedgerId = ml.addEntry("test".getBytes()).getLedgerId();
+
+        // the ledger rollover scheduled time is between 1000 and 1050 ms,
+        // wait 1100 ms, the ledger should be rolled over.
+        Awaitility.await()
+                .atMost(1100, TimeUnit.MILLISECONDS)
+                .pollInterval(100, TimeUnit.MILLISECONDS)
+                .until(() -> firstLedgerId != 
ml.addEntry("test".getBytes()).getLedgerId());
+    }
+
+    @Test
     public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws 
Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setRetentionTime(1, TimeUnit.SECONDS);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index 1bb8dcb..77ec229 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -72,7 +72,7 @@ public class CurrentLedgerRolloverIfFullTest extends 
BrokerTestBase {
         managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
         managedLedgerConfig.setMaxEntriesPerLedger(2);
         managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
-        managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS);
+        managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS);
 
         int msgNum = 10;
         for (int i = 0; i < msgNum; i++) {

Reply via email to