This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b3394540c21 [fix][ml] Fix ledger left in OPEN state when enable 
`inactiveLedgerRollOverTimeMs` (#20276)
b3394540c21 is described below

commit b3394540c218bb139fcc0658df2e6f4b4ac459da
Author: lifepuzzlefun <[email protected]>
AuthorDate: Tue May 16 17:19:46 2023 +0800

    [fix][ml] Fix ledger left in OPEN state when enable 
`inactiveLedgerRollOverTimeMs` (#20276)
    
    close `currentLegder` after roll current ledger if full
    
    (cherry picked from commit 426ad3e53ba621c442bce89eb208add8f9d1563e)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 31 +++++++++++++++++++---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 15 +++++++++++
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c74db884a95..15e9d332fa1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1775,15 +1775,19 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                                   + "acked ledgerId %s", 
currentLedger.getId(), lh.getId());
 
                     if (rc == BKException.Code.OK) {
-                        log.debug("Successfully closed ledger {}", lh.getId());
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Successfully closed ledger {}, 
trigger by rollover full ledger",
+                                    name, lh.getId());
+                        }
                     } else {
-                        log.warn("Error when closing ledger {}. Status={}", 
lh.getId(), BKException.getMessage(rc));
+                        log.warn("[{}] Error when closing ledger {}, trigger 
by rollover full ledger, Status={}",
+                                name, lh.getId(), BKException.getMessage(rc));
                     }
 
                     ledgerClosed(lh);
                     createLedgerAfterClosed();
                 }
-            }, System.nanoTime());
+            }, null);
         }
     }
 
@@ -4353,7 +4357,26 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         long currentTimeMs = System.currentTimeMillis();
         if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > 
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
             log.info("[{}] Closing inactive ledger, last-add entry {}", name, 
lastAddEntryTimeMs);
-            ledgerClosed(currentLedger);
+            if (STATE_UPDATER.compareAndSet(this, State.LedgerOpened, 
State.ClosingLedger)) {
+                LedgerHandle currentLedger = this.currentLedger;
+                currentLedger.asyncClose((rc, lh, o) -> {
+                    checkArgument(currentLedger.getId() == lh.getId(), 
"ledgerId %s doesn't match with "
+                            + "acked ledgerId %s", currentLedger.getId(), 
lh.getId());
+
+                    if (rc == BKException.Code.OK) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Successfully closed ledger {}, 
trigger by inactive ledger check",
+                                    name, lh.getId());
+                        }
+                    } else {
+                        log.warn("[{}] Error when closing ledger {}, trigger 
by inactive ledger check, Status={}",
+                                name, lh.getId(), BKException.getMessage(rc));
+                    }
+
+                    ledgerClosed(lh);
+                    // we do not create ledger here, since topic is inactive 
for a long time.
+                }, null);
+            }
         }
     }
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index dd30cde72e7..70ddbb9998f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -3859,12 +3860,26 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("rollover_inactive", config);
         ManagedCursor cursor = ledger.openCursor("c1");
 
+        List<Long> ledgerIds = new ArrayList<>();
+
         int totalAddEntries = 5;
         for (int i = 0; i < totalAddEntries; i++) {
             String content = "entry"; // 5 bytes
             ledger.checkInactiveLedgerAndRollOver();
             ledger.addEntry(content.getBytes());
             Thread.sleep(inactiveLedgerRollOverTimeMs * 5);
+
+            ledgerIds.add(ledger.currentLedger.getId());
+        }
+
+        Map<Long, PulsarMockLedgerHandle> ledgerMap = bkc.getLedgerMap();
+        // skip check last ledger, it should be open
+        for (int i = 0; i < ledgerIds.size() - 1; i++) {
+            long ledgerId = ledgerIds.get(i);
+            LedgerMetadata ledgerMetadata = 
ledgerMap.get(ledgerId).getLedgerMetadata();
+            if (ledgerMetadata != null) {
+                assertTrue(ledgerMetadata.isClosed());
+            }
         }
 
         List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();

Reply via email to