This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 5b31f9b Fix lost message issue due to ledger rollover. (#14703)
5b31f9b is described below
commit 5b31f9b19f0f1c374a45c972ea00f99ef5f31f17
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Mar 16 14:59:47 2022 +0800
Fix lost message issue due to ledger rollover. (#14703)
Cherry-pick https://github.com/apache/pulsar/pull/14664
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 ++++----
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 3 +++
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 23 ++++++++++++++++++++++
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0dda242..78e00cc 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -656,8 +656,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
} else if (state == State.ClosedLedger) {
// No ledger and no pending operations. Create a new ledger
- log.info("[{}] Creating a new ledger", name);
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger,
State.CreatingLedger)) {
+ log.info("[{}] Creating a new ledger", name);
this.lastLedgerCreationInitiationTimestamp =
System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
asyncCreateLedger(bookKeeper, config, digestType, this,
Collections.emptyMap());
@@ -1474,7 +1474,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
synchronized void createLedgerAfterClosed() {
if(isNeededCreateNewLedgerAfterCloseLedger()) {
- log.info("[{}] Creating a new ledger", name);
+ log.info("[{}] Creating a new ledger after closed", name);
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp =
System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
@@ -1493,8 +1493,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public void rollCurrentLedgerIfFull() {
log.info("[{}] Start checking if current ledger is full", name);
- if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
- STATE_UPDATER.set(this, State.ClosingLedger);
+ if (currentLedgerEntries > 0 && currentLedgerIsFull()
+ && STATE_UPDATER.compareAndSet(this, State.LedgerOpened,
State.ClosingLedger)) {
currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
@Override
public void closeComplete(int rc, LedgerHandle lh, Object o) {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index cfa7f0b..5a56026 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2194,6 +2194,9 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
// roll a new ledger
int numLedgersBefore = ledger.getLedgersInfo().size();
ledger.getConfig().setMaxEntriesPerLedger(1);
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().atMost(20, TimeUnit.SECONDS)
.until(() -> ledger.getLedgersInfo().size() >
numLedgersBefore);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 690cb57..2f001a2 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1855,6 +1855,9 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
// let current ledger close
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
ml.rollCurrentLedgerIfFull();
// let retention expire
Thread.sleep(1500);
@@ -3004,4 +3007,24 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
verify(ledgerOffloader, times(1)).getOffloadPolicies();
}
+
+ @Test
+ public void testLedgerNotRolloverWithoutOpenState() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+
+ ManagedLedgerImpl ml =
spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state",
config));
+ ((PositionImpl)ml.addEntry("test1".getBytes())).getLedgerId();
+ long ledgerId2 =
((PositionImpl)ml.addEntry("test2".getBytes())).getLedgerId();
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ // Set state to CreatingLedger to avoid rollover
+ stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger);
+ ml.rollCurrentLedgerIfFull();
+ Field currentLedger =
ManagedLedgerImpl.class.getDeclaredField("currentLedger");
+ currentLedger.setAccessible(true);
+ LedgerHandle lh = (LedgerHandle) currentLedger.get(ml);
+ Awaitility.await()
+ .until(() -> ledgerId2 == lh.getId());
+ }
}