This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 5b31f9b  Fix lost message issue due to ledger rollover. (#14703)
5b31f9b is described below

commit 5b31f9b19f0f1c374a45c972ea00f99ef5f31f17
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Mar 16 14:59:47 2022 +0800

    Fix lost message issue due to ledger rollover. (#14703)
    
    Cherry-pick https://github.com/apache/pulsar/pull/14664
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  8 ++++----
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  3 +++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 23 ++++++++++++++++++++++
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0dda242..78e00cc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -656,8 +656,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             }
         } else if (state == State.ClosedLedger) {
             // No ledger and no pending operations. Create a new ledger
-            log.info("[{}] Creating a new ledger", name);
             if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, 
State.CreatingLedger)) {
+                log.info("[{}] Creating a new ledger", name);
                 this.lastLedgerCreationInitiationTimestamp = 
System.currentTimeMillis();
                 mbean.startDataLedgerCreateOp();
                 asyncCreateLedger(bookKeeper, config, digestType, this, 
Collections.emptyMap());
@@ -1474,7 +1474,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     synchronized void createLedgerAfterClosed() {
         if(isNeededCreateNewLedgerAfterCloseLedger()) {
-            log.info("[{}] Creating a new ledger", name);
+            log.info("[{}] Creating a new ledger after closed", name);
             STATE_UPDATER.set(this, State.CreatingLedger);
             this.lastLedgerCreationInitiationTimestamp = 
System.currentTimeMillis();
             mbean.startDataLedgerCreateOp();
@@ -1493,8 +1493,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     @Override
     public void rollCurrentLedgerIfFull() {
         log.info("[{}] Start checking if current ledger is full", name);
-        if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
-            STATE_UPDATER.set(this, State.ClosingLedger);
+        if (currentLedgerEntries > 0 && currentLedgerIsFull()
+                && STATE_UPDATER.compareAndSet(this, State.LedgerOpened, 
State.ClosingLedger)) {
             currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
                 @Override
                 public void closeComplete(int rc, LedgerHandle lh, Object o) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index cfa7f0b..5a56026 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2194,6 +2194,9 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         // roll a new ledger
         int numLedgersBefore = ledger.getLedgersInfo().size();
         ledger.getConfig().setMaxEntriesPerLedger(1);
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
         ledger.rollCurrentLedgerIfFull();
         Awaitility.await().atMost(20, TimeUnit.SECONDS)
                 .until(() -> ledger.getLedgersInfo().size() > 
numLedgersBefore);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 690cb57..2f001a2 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1855,6 +1855,9 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         c2.skipEntries(1, IndividualDeletedEntries.Exclude);
         // let current ledger close
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
         ml.rollCurrentLedgerIfFull();
         // let retention expire
         Thread.sleep(1500);
@@ -3004,4 +3007,24 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
         verify(ledgerOffloader, times(1)).getOffloadPolicies();
     }
+
+    @Test
+    public void testLedgerNotRolloverWithoutOpenState() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+
+        ManagedLedgerImpl ml = 
spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state", 
config));
+        ((PositionImpl)ml.addEntry("test1".getBytes())).getLedgerId();
+        long ledgerId2 = 
((PositionImpl)ml.addEntry("test2".getBytes())).getLedgerId();
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        // Set state to CreatingLedger to avoid rollover
+        stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger);
+        ml.rollCurrentLedgerIfFull();
+        Field currentLedger = 
ManagedLedgerImpl.class.getDeclaredField("currentLedger");
+        currentLedger.setAccessible(true);
+        LedgerHandle lh = (LedgerHandle) currentLedger.get(ml);
+        Awaitility.await()
+                .until(() -> ledgerId2 == lh.getId());
+    }
 }

Reply via email to