This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 84a488535b2 [improve][broker] Don't rollover empty ledgers based on
inactivity (#21893)
84a488535b2 is described below
commit 84a488535b2dc9efbb68b4380889259fd0409f0a
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Jan 14 23:28:43 2024 -0800
[improve][broker] Don't rollover empty ledgers based on inactivity (#21893)
### Motivation
When `managedLedgerInactiveLedgerRolloverTimeSeconds` is set, let's say to
`300` (5 minutes), the ledger will also get rolled in the case when no new
entries (messages) were added to the ledger. This doesn't make sense.
Empty ledgers are deleted, but having this extra churn is causing extra
load on brokers, bookies, and metadata stores (zookeeper).
### Modifications
Skip rolling the ledger if it is empty.
(cherry picked from commit 49edc3d9ba8abf7ba4169653a8093e2f866d7f78)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 ++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 25 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 82db81f22d2..64c85f21fc6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4405,7 +4405,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public boolean checkInactiveLedgerAndRollOver() {
long currentTimeMs = System.currentTimeMillis();
- if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs >
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
+ if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 &&
currentTimeMs > (lastAddEntryTimeMs
+ + inactiveLedgerRollOverTimeMs)) {
log.info("[{}] Closing inactive ledger, last-add entry {}", name,
lastAddEntryTimeMs);
if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened,
State.ClosingLedger)) {
LedgerHandle currentLedger = this.currentLedger;
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index b990e434df3..6c4f21c3af2 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3898,6 +3898,30 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
factory.shutdown();
}
+ @Test
+ public void testDontRollOverEmptyInactiveLedgers() throws Exception {
+ int inactiveLedgerRollOverTimeMs = 5;
+ ManagedLedgerFactoryConfig factoryConf = new
ManagedLedgerFactoryConfig();
+ @Cleanup("shutdown")
+ ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs,
TimeUnit.MILLISECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("rollover_inactive", config);
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ long ledgerId = ledger.currentLedger.getId();
+
+ Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+ ledger.checkInactiveLedgerAndRollOver();
+
+ Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+ ledger.checkInactiveLedgerAndRollOver();
+
+ assertEquals(ledger.currentLedger.getId(), ledgerId);
+
+ ledger.close();
+ }
+
@Test
public void testOffloadTaskCancelled() throws Exception {
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
@@ -4093,6 +4117,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(10, TimeUnit.MILLISECONDS);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName,
config);
+ ml.addEntry("entry".getBytes(UTF_8));
MutableBoolean isRolledOver = new MutableBoolean(false);
retryStrategically((test) -> {