This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b3394540c21 [fix][ml] Fix ledger left in OPEN state when enable
`inactiveLedgerRollOverTimeMs` (#20276)
b3394540c21 is described below
commit b3394540c218bb139fcc0658df2e6f4b4ac459da
Author: lifepuzzlefun <[email protected]>
AuthorDate: Tue May 16 17:19:46 2023 +0800
[fix][ml] Fix ledger left in OPEN state when enable
`inactiveLedgerRollOverTimeMs` (#20276)
close `currentLegder` after roll current ledger if full
(cherry picked from commit 426ad3e53ba621c442bce89eb208add8f9d1563e)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 31 +++++++++++++++++++---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 15 +++++++++++
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c74db884a95..15e9d332fa1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1775,15 +1775,19 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
+ "acked ledgerId %s",
currentLedger.getId(), lh.getId());
if (rc == BKException.Code.OK) {
- log.debug("Successfully closed ledger {}", lh.getId());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Successfully closed ledger {},
trigger by rollover full ledger",
+ name, lh.getId());
+ }
} else {
- log.warn("Error when closing ledger {}. Status={}",
lh.getId(), BKException.getMessage(rc));
+ log.warn("[{}] Error when closing ledger {}, trigger
by rollover full ledger, Status={}",
+ name, lh.getId(), BKException.getMessage(rc));
}
ledgerClosed(lh);
createLedgerAfterClosed();
}
- }, System.nanoTime());
+ }, null);
}
}
@@ -4353,7 +4357,26 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
long currentTimeMs = System.currentTimeMillis();
if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs >
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
log.info("[{}] Closing inactive ledger, last-add entry {}", name,
lastAddEntryTimeMs);
- ledgerClosed(currentLedger);
+ if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened,
State.ClosingLedger)) {
+ LedgerHandle currentLedger = this.currentLedger;
+ currentLedger.asyncClose((rc, lh, o) -> {
+ checkArgument(currentLedger.getId() == lh.getId(),
"ledgerId %s doesn't match with "
+ + "acked ledgerId %s", currentLedger.getId(),
lh.getId());
+
+ if (rc == BKException.Code.OK) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Successfully closed ledger {},
trigger by inactive ledger check",
+ name, lh.getId());
+ }
+ } else {
+ log.warn("[{}] Error when closing ledger {}, trigger
by inactive ledger check, Status={}",
+ name, lh.getId(), BKException.getMessage(rc));
+ }
+
+ ledgerClosed(lh);
+ // we do not create ledger here, since topic is inactive
for a long time.
+ }, null);
+ }
}
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index dd30cde72e7..70ddbb9998f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -3859,12 +3860,26 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("rollover_inactive", config);
ManagedCursor cursor = ledger.openCursor("c1");
+ List<Long> ledgerIds = new ArrayList<>();
+
int totalAddEntries = 5;
for (int i = 0; i < totalAddEntries; i++) {
String content = "entry"; // 5 bytes
ledger.checkInactiveLedgerAndRollOver();
ledger.addEntry(content.getBytes());
Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+
+ ledgerIds.add(ledger.currentLedger.getId());
+ }
+
+ Map<Long, PulsarMockLedgerHandle> ledgerMap = bkc.getLedgerMap();
+ // skip check last ledger, it should be open
+ for (int i = 0; i < ledgerIds.size() - 1; i++) {
+ long ledgerId = ledgerIds.get(i);
+ LedgerMetadata ledgerMetadata =
ledgerMap.get(ledgerId).getLedgerMetadata();
+ if (ledgerMetadata != null) {
+ assertTrue(ledgerMetadata.isClosed());
+ }
}
List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();