This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 936629cc684b1d45ebc47d3066f67bf7dbf9a067 Author: ran <[email protected]> AuthorDate: Sat Jul 3 01:40:37 2021 +0800 [Ledger] Fix ledger rollover scheduled task (#11116) ### Motivation Currently, the ledger rollover scheduled task will execute before reach the ledger maximum rollover time, this will cause the ledger doesn't roll over in time. ### Modifications Only make the ledger rollover scheduled task after the ledger created successfully. If the scheduled task was executed when there is no entry in the current ledger, the scheduled task will not be re-executed, and if there is new entry is added the ledger will rollover. ### Verifying this change Add a unit test to verify the ledger could be rolled over in time. (cherry picked from commit a092ceb63588c94ad12327464e65b5fb69d1569e) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 31 ++++++++++++---------- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 17 ++++++++++++ .../service/CurrentLedgerRolloverIfFullTest.java | 2 +- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7677c71..9a5cea0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -401,8 +401,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }); scheduleTimeoutTask(); - - scheduleRollOverLedgerTask(); } private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { @@ -467,7 +465,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1488,7 +1486,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3361,7 +3359,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte)); long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; - boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs; + boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs(); if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) { @@ -3663,15 +3661,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - private void scheduleRollOverLedgerTask() { - if (config.getMaximumRolloverTimeMs() > 0) { - long interval = config.getMaximumRolloverTimeMs(); - this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> { - rollCurrentLedgerIfFull(); - }), interval, interval, TimeUnit.MILLISECONDS); - } - } - private void checkAddTimeout() { long timeoutSec = config.getAddEntryTimeoutSeconds(); if (timeoutSec < 1) { @@ -3929,4 +3918,18 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return CompletableFuture.completedFuture(ensembles); }); } + + private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + this.lastLedgerCreatedTimestamp = clock.millis(); + if (config.getMaximumRolloverTimeMs() > 0) { + if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { + // new ledger has been created successfully + // and the previous checkLedgerRollTask is not done, we could cancel it + checkLedgerRollTask.cancel(true); + } + this.checkLedgerRollTask = this.scheduledExecutor.schedule( + safeRun(this::rollCurrentLedgerIfFull), getMaximumRolloverTimeMs(config), TimeUnit.MILLISECONDS); + } + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index e0b15fe..d55598c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2923,6 +2923,23 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { } @Test + public void testLedgerReachMaximumRolloverTime() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + config.setMaximumRolloverTime(1, TimeUnit.SECONDS); + + ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config); + long firstLedgerId = ml.addEntry("test".getBytes()).getLedgerId(); + + // the ledger rollover scheduled time is between 1000 and 1050 ms, + // wait 1100 ms, the ledger should be rolled over. + Awaitility.await() + .atMost(1100, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId()); + } + + @Test public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setRetentionTime(1, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java index 1bb8dcb..77ec229 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java @@ -72,7 +72,7 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase { managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); managedLedgerConfig.setMaxEntriesPerLedger(2); managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); - managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS); + managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS); int msgNum = 10; for (int i = 0; i < msgNum; i++) {
