This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new f1d6a15956a [improve] [ml] Persist mark deleted ops to ZK if create
cursor ledger was failed (#20935)
f1d6a15956a is described below
commit f1d6a15956a47d946ef03d2396ed29f3f165a8ea
Author: fengyubiao <[email protected]>
AuthorDate: Fri Sep 1 02:37:00 2023 +0800
[improve] [ml] Persist mark deleted ops to ZK if create cursor ledger was
failed (#20935)
The progress Persist mark deleted position is like this:
- persist to BK
- If failed to persist to BK, try to persist to ZK
But in the current implementation: if the cursor ledger was created failed,
Pulsar will not try to persist to ZK. It makes if the cursor ledger created
fails, a lot of ack records can not be persisted, and we will get a lot of
repeat consumption after the BK recover.
Modifications: Try to persist the mark deleted position to ZK if the cursor
ledger was created failed
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 77 ++++++++++------
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 28 ++++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 102 +++++++++++++++++++++
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 64 +++++++++++++
4 files changed, 241 insertions(+), 30 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8ce3a322c09..e2b202cce15 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2099,7 +2099,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
});
- persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() {
+ VoidCallback cb = new VoidCallback() {
@Override
public void operationComplete() {
if (log.isDebugEnabled()) {
@@ -2151,7 +2151,18 @@ public class ManagedCursorImpl implements ManagedCursor {
mdEntry.triggerFailed(exception);
}
- });
+ };
+
+ if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
+ if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
+ persistPositionToMetaStore(mdEntry, cb);
+ } else {
+ mdEntry.callback.markDeleteFailed(new
ManagedLedgerException("Create new cursor ledger failed"),
+ mdEntry.ctx);
+ }
+ } else {
+ persistPositionToLedger(cursorLedger, mdEntry, cb);
+ }
}
@Override
@@ -2797,16 +2808,15 @@ public class ManagedCursorImpl implements ManagedCursor
{
@Override
public void operationFailed(ManagedLedgerException exception) {
- log.error("[{}][{}] Metadata ledger creation failed",
ledger.getName(), name, exception);
+ log.error("[{}][{}] Metadata ledger creation failed {}, try to
persist the position in the metadata"
+ + " store.", ledger.getName(), name, exception);
synchronized (pendingMarkDeleteOps) {
- while (!pendingMarkDeleteOps.isEmpty()) {
- MarkDeleteEntry entry = pendingMarkDeleteOps.poll();
- entry.callback.markDeleteFailed(exception, entry.ctx);
- }
-
// At this point we don't have a ledger ready
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
+ // Note: if the stat is NoLedger, will persist the mark
deleted position to metadata store.
+ // Before giving up, try to persist the position in the
metadata store.
+ flushPendingMarkDeletes();
}
}
});
@@ -3073,32 +3083,39 @@ public class ManagedCursorImpl implements ManagedCursor
{
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this,
State.Open, State.NoLedger);
- mbean.persistToLedger(false);
- // Before giving up, try to persist the position in the
metadata store
- persistPositionMetaStore(-1, position, mdEntry.properties, new
MetaStoreCallback<Void>() {
- @Override
- public void operationComplete(Void result, Stat stat) {
- if (log.isDebugEnabled()) {
- log.debug(
- "[{}][{}] Updated cursor in meta store
after previous failure in ledger at position"
- + " {}", ledger.getName(), name, position);
- }
- mbean.persistToZookeeper(true);
- callback.operationComplete();
- }
-
- @Override
- public void operationFailed(MetaStoreException e) {
- log.warn("[{}][{}] Failed to update cursor in meta
store after previous failure in ledger: {}",
- ledger.getName(), name, e.getMessage());
- mbean.persistToZookeeper(false);
-
callback.operationFailed(createManagedLedgerException(rc));
- }
- }, true);
+ // Before giving up, try to persist the position in the
metadata store.
+ persistPositionToMetaStore(mdEntry, callback);
}
}, null);
}
+ void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final
VoidCallback callback) {
+ final PositionImpl newPosition = mdEntry.newPosition;
+ STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open,
State.NoLedger);
+ mbean.persistToLedger(false);
+ // Before giving up, try to persist the position in the metadata store
+ persistPositionMetaStore(-1, newPosition, mdEntry.properties, new
MetaStoreCallback<Void>() {
+ @Override
+ public void operationComplete(Void result, Stat stat) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "[{}][{}] Updated cursor in meta store after
previous failure in ledger at position"
+ + " {}", ledger.getName(), name, newPosition);
+ }
+ mbean.persistToZookeeper(true);
+ callback.operationComplete();
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ log.warn("[{}][{}] Failed to update cursor in meta store after
previous failure in ledger: {}",
+ ledger.getName(), name, e.getMessage());
+ mbean.persistToZookeeper(false);
+ callback.operationFailed(createManagedLedgerException(e));
+ }
+ }, true);
+ }
+
boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
if (ledger.getFactory().isMetadataServiceAvailable()
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 03fb3191915..fb9da6db60e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3524,6 +3524,34 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return positionToReturn;
}
+ public boolean isNoMessagesAfterPos(PositionImpl pos) {
+ PositionImpl lac = (PositionImpl) getLastConfirmedEntry();
+ return isNoMessagesAfterPosForSpecifiedLac(lac, pos);
+ }
+
+ private boolean isNoMessagesAfterPosForSpecifiedLac(PositionImpl
specifiedLac, PositionImpl pos) {
+ if (pos.compareTo(specifiedLac) >= 0) {
+ return true;
+ }
+ if (specifiedLac.getEntryId() < 0) {
+ // Calculate the meaningful LAC.
+ PositionImpl actLac = getPreviousPosition(specifiedLac);
+ if (actLac.getEntryId() >= 0) {
+ return pos.compareTo(actLac) >= 0;
+ } else {
+ // If the actual LAC is still not meaningful.
+ if (actLac.equals(specifiedLac)) {
+ // No entries in maneged ledger.
+ return true;
+ } else {
+ // Continue to find a valid LAC.
+ return isNoMessagesAfterPosForSpecifiedLac(actLac, pos);
+ }
+ }
+ }
+ return false;
+ }
+
/**
* Get the entry position that come before the specified position in the
message stream, using information from the
* ledger list and each ledger entries count.
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1b1b5534256..627ae73d928 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -229,6 +229,97 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
entries.forEach(Entry::release);
}
+ @Test
+ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception
{
+ final int entryCount = 10;
+ final String cursorName = "c1";
+ final String mlName = "ml_test";
+ final ManagedLedgerConfig mlConfig = new
ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName,
mlConfig);
+
+ ManagedCursor cursor = ml.openCursor("c1");
+ Position lastEntry = null;
+ for (int i = 0; i < 10; i++) {
+ lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
+ }
+
+ // Mock cursor ledger create failed.
+ bkc.failNow(BKException.Code.NoBookieAvailableException);
+
+ cursor.markDelete(lastEntry);
+
+ // Assert persist mark deleted position to ZK was successful.
+ PositionImpl slowestReadPosition =
ml.getCursors().getSlowestReaderPosition();
+ assertTrue(slowestReadPosition.getLedgerId() >=
lastEntry.getLedgerId());
+ assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+ assertEquals(cursor.getStats().getPersistLedgerSucceed(), 0);
+ assertTrue(cursor.getStats().getPersistZookeeperSucceed() > 0);
+ assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
+
+ // Verify the mark delete position can be recovered properly.
+ ml.close();
+ ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+ ManagedCursorImpl cursorRecovered = (ManagedCursorImpl)
ml.openCursor(cursorName);
+ assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(),
lastEntry);
+
+ // cleanup.
+ ml.delete();
+ }
+
+ @Test
+ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception
{
+ final int entryCount = 10;
+ final String cursorName = "c1";
+ final String mlName = "ml_test";
+ final ManagedLedgerConfig mlConfig = new
ManagedLedgerConfig().setMaxEntriesPerLedger(1);
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName,
mlConfig);
+
+ final ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.openCursor(cursorName);
+ ArrayList<Position> positions = new ArrayList<>();
+ for (int i = 0; i < entryCount; i++) {
+ positions.add(ml.addEntry(("entry-" + i).getBytes(Encoding)));
+ }
+ // Trigger the cursor ledger creating.
+ cursor.markDelete(positions.get(0));
+ assertTrue(cursor.getStats().getPersistLedgerSucceed() > 0);
+
+ // Mock cursor ledger write failed.
+ bkc.addEntryFailAfter(0, BKException.Code.NoBookieAvailableException);
+ // Trigger a failed writing of the cursor ledger, then wait the stat
of cursor to be "NoLedger".
+ // This time ZK will be written due to a failure to write BK.
+ cursor.markDelete(positions.get(1));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cursor.getState(), "NoLedger");
+ });
+ assertTrue(cursor.getStats().getPersistLedgerErrors() > 0);
+ long persistZookeeperSucceed1 =
cursor.getStats().getPersistZookeeperSucceed();
+ assertTrue(persistZookeeperSucceed1 > 0);
+
+ // Mock cursor ledger create failed.
+ bkc.failNow(BKException.Code.NoBookieAvailableException);
+ // Verify the cursor status will be persistent to ZK even if the
cursor ledger creation always fails.
+ // This time ZK will be written due to catch up.
+ Position lastEntry = positions.get(entryCount -1);
+ cursor.markDelete(lastEntry);
+ long persistZookeeperSucceed2 =
cursor.getStats().getPersistZookeeperSucceed();
+ assertTrue(persistZookeeperSucceed2 > persistZookeeperSucceed1);
+
+ // Assert persist mark deleted position to ZK was successful.
+ PositionImpl slowestReadPosition =
ml.getCursors().getSlowestReaderPosition();
+ assertTrue(slowestReadPosition.getLedgerId() >=
lastEntry.getLedgerId());
+ assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+ assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
+
+ // Verify the mark delete position can be recovered properly.
+ ml.close();
+ ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);
+ ManagedCursorImpl cursorRecovered = (ManagedCursorImpl)
ml.openCursor(cursorName);
+ assertEquals(cursorRecovered.getPersistentMarkDeletedPosition(),
lastEntry);
+
+ // cleanup.
+ ml.delete();
+ }
+
@Test(timeOut = 20000)
void readWithCacheDisabled() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
@@ -1421,6 +1512,17 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
ledger = factory2.open("my_test_ledger");
ManagedCursor cursor = ledger.openCursor("c1");
Position position = ledger.addEntry("test".getBytes());
+ // Make persist zk fail once.
+ AtomicInteger persistZKTimes = new AtomicInteger();
+ metadataStore.failConditional(new
MetadataStoreException.BadVersionException("mock ex"), (type, path) -> {
+ if (FaultInjectionMetadataStore.OperationType.PUT.equals(type)
+ && path.equals("/managed-ledgers/my_test_ledger/c1")) {
+ if (persistZKTimes.incrementAndGet() == 1) {
+ return true;
+ }
+ }
+ return false;
+ });
try {
cursor.markDelete(position);
fail("should have failed");
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 70ddbb9998f..5fc2da22b66 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3968,6 +3968,70 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
}
+ @Test
+ public void testIsNoMessagesAfterPos() throws Exception {
+ final byte[] data = new byte[]{1,2,3};
+ final String cursorName = "c1";
+ final String mlName = UUID.randomUUID().toString().replaceAll("-", "");
+ final ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
+ final ManagedCursor managedCursor = ml.openCursor(cursorName);
+
+ // One ledger.
+ PositionImpl p1 = (PositionImpl) ml.addEntry(data);
+ PositionImpl p2 = (PositionImpl) ml.addEntry(data);
+ PositionImpl p3 = (PositionImpl) ml.addEntry(data);
+ assertFalse(ml.isNoMessagesAfterPos(p1));
+ assertFalse(ml.isNoMessagesAfterPos(p2));
+ assertTrue(ml.isNoMessagesAfterPos(p3));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p3.getLedgerId(),
p3.getEntryId() + 1)));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p3.getLedgerId() +
1, -1)));
+
+ // More than one ledger.
+ ml.ledgerClosed(ml.currentLedger);
+ PositionImpl p4 = (PositionImpl) ml.addEntry(data);
+ PositionImpl p5 = (PositionImpl) ml.addEntry(data);
+ PositionImpl p6 = (PositionImpl) ml.addEntry(data);
+ assertFalse(ml.isNoMessagesAfterPos(p1));
+ assertFalse(ml.isNoMessagesAfterPos(p2));
+ assertFalse(ml.isNoMessagesAfterPos(p3));
+ assertFalse(ml.isNoMessagesAfterPos(p4));
+ assertFalse(ml.isNoMessagesAfterPos(p5));
+ assertTrue(ml.isNoMessagesAfterPos(p6));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(),
p6.getEntryId() + 1)));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() +
1, -1)));
+
+ // Switch ledger and make the entry id of Last confirmed entry is -1;
+ ml.ledgerClosed(ml.currentLedger);
+ ml.createLedgerAfterClosed();
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ml.currentLedgerEntries, 0);
+ });
+ ml.lastConfirmedEntry = PositionImpl.get(ml.currentLedger.getId(), -1);
+ assertFalse(ml.isNoMessagesAfterPos(p5));
+ assertTrue(ml.isNoMessagesAfterPos(p6));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(),
p6.getEntryId() + 1)));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() +
1, -1)));
+
+ // Trim ledgers to make there is no entries in ML.
+ ml.deleteCursor(cursorName);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ml.trimConsumedLedgersInBackground(true, future);
+ future.get();
+ assertEquals(ml.ledgers.size(), 1);
+ assertEquals(ml.lastConfirmedEntry.getEntryId(), -1);
+ assertTrue(ml.isNoMessagesAfterPos(p1));
+ assertTrue(ml.isNoMessagesAfterPos(p2));
+ assertTrue(ml.isNoMessagesAfterPos(p3));
+ assertTrue(ml.isNoMessagesAfterPos(p4));
+ assertTrue(ml.isNoMessagesAfterPos(p5));
+ assertTrue(ml.isNoMessagesAfterPos(p6));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId(),
p6.getEntryId() + 1)));
+ assertTrue(ml.isNoMessagesAfterPos(PositionImpl.get(p6.getLedgerId() +
1, -1)));
+
+ // cleanup.
+ ml.close();
+ }
+
@Test
public void testGetEstimatedBacklogSize() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();