This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 5348e1b9124 [improve][broker] Don't rollover empty ledgers based on
inactivity (#21893)
5348e1b9124 is described below
commit 5348e1b9124052a454f66769ae3e9f54ee0a75d4
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Jan 14 23:28:43 2024 -0800
[improve][broker] Don't rollover empty ledgers based on inactivity (#21893)
### Motivation
When `managedLedgerInactiveLedgerRolloverTimeSeconds` is set, let's say to
`300` (5 minutes), the ledger will also get rolled in the case when no new
entries (messages) were added to the ledger. This doesn't make sense.
Empty ledgers are deleted, but having this extra churn is causing extra
load on brokers, bookies, and metadata stores (zookeeper).
### Modifications
Skip rolling the ledger if it is empty.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 ++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 25 ++++++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 82a3a842454..a4bfb5d6c9e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4459,7 +4459,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public boolean checkInactiveLedgerAndRollOver() {
long currentTimeMs = System.currentTimeMillis();
- if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs >
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
+ if (currentLedgerEntries > 0 && inactiveLedgerRollOverTimeMs > 0 &&
currentTimeMs > (lastAddEntryTimeMs
+ + inactiveLedgerRollOverTimeMs)) {
log.info("[{}] Closing inactive ledger, last-add entry {}", name,
lastAddEntryTimeMs);
if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened,
State.ClosingLedger)) {
LedgerHandle currentLedger = this.currentLedger;
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8430afb4e4f..4c92911c687 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3898,6 +3898,30 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testDontRollOverEmptyInactiveLedgers() throws Exception {
+ int inactiveLedgerRollOverTimeMs = 5;
+ ManagedLedgerFactoryConfig factoryConf = new
ManagedLedgerFactoryConfig();
+ @Cleanup("shutdown")
+ ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs,
TimeUnit.MILLISECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("rollover_inactive", config);
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ long ledgerId = ledger.currentLedger.getId();
+
+ Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+ ledger.checkInactiveLedgerAndRollOver();
+
+ Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+ ledger.checkInactiveLedgerAndRollOver();
+
+ assertEquals(ledger.currentLedger.getId(), ledgerId);
+
+ ledger.close();
+ }
+
@Test
public void testOffloadTaskCancelled() throws Exception {
@Cleanup("shutdown")
@@ -4094,6 +4118,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(10, TimeUnit.MILLISECONDS);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName,
config);
+ ml.addEntry("entry".getBytes(UTF_8));
MutableBoolean isRolledOver = new MutableBoolean(false);
retryStrategically((test) -> {