codelipenghui commented on code in PR #20935:
URL: https://github.com/apache/pulsar/pull/20935#discussion_r1288082053
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -3073,32 +3078,39 @@ void persistPositionToLedger(final LedgerHandle lh,
MarkDeleteEntry mdEntry, fin
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this,
State.Open, State.NoLedger);
- mbean.persistToLedger(false);
- // Before giving up, try to persist the position in the
metadata store
- persistPositionMetaStore(-1, position, mdEntry.properties, new
MetaStoreCallback<Void>() {
- @Override
- public void operationComplete(Void result, Stat stat) {
- if (log.isDebugEnabled()) {
- log.debug(
- "[{}][{}] Updated cursor in meta store
after previous failure in ledger at position"
- + " {}", ledger.getName(), name, position);
- }
- mbean.persistToZookeeper(true);
- callback.operationComplete();
- }
-
- @Override
- public void operationFailed(MetaStoreException e) {
- log.warn("[{}][{}] Failed to update cursor in meta
store after previous failure in ledger: {}",
- ledger.getName(), name, e.getMessage());
- mbean.persistToZookeeper(false);
-
callback.operationFailed(createManagedLedgerException(rc));
- }
- }, true);
+ // Before giving up, try to persist the position in the
metadata store.
+ persistPositionMetaStore(mdEntry, callback);
}
}, null);
}
+ void persistPositionMetaStore(MarkDeleteEntry mdEntry, final VoidCallback
callback) {
Review Comment:
```suggestion
void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final
VoidCallback callback) {
```
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -229,6 +229,76 @@ void readTwice() throws Exception {
entries.forEach(Entry::release);
}
+ @Test
+ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception
{
+ ManagedLedgerImpl ml =
+ (ManagedLedgerImpl) factory.open("ml_test", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+ ManagedCursor cursor = ml.openCursor("c1");
+ Position lastEntry = null;
+ for (int i = 0; i < 10; i++) {
+ lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
+ }
+
+ // Mock cursor ledger create failed.
+ bkc.failNow(BKException.Code.NoBookieAvailableException);
+
+ cursor.markDelete(lastEntry);
+
+ // Assert persist mark deleted position to ZK was successful.
+ PositionImpl slowestReadPosition =
ml.getCursors().getSlowestReaderPosition();
+ assertTrue(slowestReadPosition.getLedgerId() >=
lastEntry.getLedgerId());
+ assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+ assertEquals(cursor.getStats().getPersistLedgerSucceed(), 0);
+ assertTrue(cursor.getStats().getPersistZookeeperSucceed() > 0);
+
+ // cleanup.
+ ml.delete();
+ }
+
+ @Test
+ void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception
{
+ ManagedLedgerImpl ml =
+ (ManagedLedgerImpl) factory.open("ml_test", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+ final int entryCount = 10;
+
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor("c1");
+ ArrayList<Position> positions = new ArrayList<>();
+ for (int i = 0; i < entryCount; i++) {
+ positions.add(ml.addEntry(("entry-" + i).getBytes(Encoding)));
+ }
+ // Trigger the cursor ledger creating.
+ cursor.markDelete(positions.get(0));
+ assertTrue(cursor.getStats().getPersistLedgerSucceed() > 0);
+
+ // Mock cursor ledger write failed.
+ bkc.addEntryFailAfter(0, BKException.Code.NoBookieAvailableException);
+ // Trigger a failed writing of the cursor ledger, then wait the stat
of cursor to be "NoLedger".
+ cursor.markDelete(positions.get(1));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cursor.getState(), "NoLedger");
+ });
+ assertTrue(cursor.getStats().getPersistLedgerErrors() > 0);
+ long persistZookeeperSucceed1 =
cursor.getStats().getPersistZookeeperSucceed();
+ assertTrue(persistZookeeperSucceed1 > 0);
+
+ // Mock cursor ledger create failed.
+ bkc.failNow(BKException.Code.NoBookieAvailableException);
+ // Verify the cursor status will be persistent to ZK even if the
cursor ledger creation always fails.
+ Position lastEntry = positions.get(entryCount -1);
+ cursor.markDelete(lastEntry);
+ long persistZookeeperSucceed2 =
cursor.getStats().getPersistZookeeperSucceed();
+ assertTrue(persistZookeeperSucceed2 > persistZookeeperSucceed1);
+
+ // Assert persist mark deleted position to ZK was successful.
+ PositionImpl slowestReadPosition =
ml.getCursors().getSlowestReaderPosition();
+ assertTrue(slowestReadPosition.getLedgerId() >=
lastEntry.getLedgerId());
+ assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
Review Comment:
Same as the above comment
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -229,6 +229,76 @@ void readTwice() throws Exception {
entries.forEach(Entry::release);
}
+ @Test
+ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception
{
+ ManagedLedgerImpl ml =
+ (ManagedLedgerImpl) factory.open("ml_test", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+ ManagedCursor cursor = ml.openCursor("c1");
+ Position lastEntry = null;
+ for (int i = 0; i < 10; i++) {
+ lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
+ }
+
+ // Mock cursor ledger create failed.
+ bkc.failNow(BKException.Code.NoBookieAvailableException);
+
+ cursor.markDelete(lastEntry);
+
+ // Assert persist mark deleted position to ZK was successful.
+ PositionImpl slowestReadPosition =
ml.getCursors().getSlowestReaderPosition();
+ assertTrue(slowestReadPosition.getLedgerId() >=
lastEntry.getLedgerId());
+ assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+ assertEquals(cursor.getStats().getPersistLedgerSucceed(), 0);
+ assertTrue(cursor.getStats().getPersistZookeeperSucceed() > 0);
Review Comment:
Please add the following lines to make sure the mark delete position can be
recovered properly.
```java
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
ml.close();
ml = (ManagedLedgerImpl) factory.open("ml_test", new
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
cursor = ml.openCursor("c1");
assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
```
##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -229,6 +229,28 @@ void readTwice() throws Exception {
entries.forEach(Entry::release);
}
+ @Test
+ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception
{
Review Comment:
Ah, got it. The cursor Ledger ID is updated to -1 after persisting to
ZooKeeper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]