codelipenghui commented on code in PR #20935:
URL: https://github.com/apache/pulsar/pull/20935#discussion_r1288082053


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -3073,32 +3078,39 @@ void persistPositionToLedger(final LedgerHandle lh, 
MarkDeleteEntry mdEntry, fin
                 // in the meantime the mark-delete will be queued.
                 STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, 
State.Open, State.NoLedger);
 
-                mbean.persistToLedger(false);
-                // Before giving up, try to persist the position in the 
metadata store
-                persistPositionMetaStore(-1, position, mdEntry.properties, new 
MetaStoreCallback<Void>() {
-                    @Override
-                    public void operationComplete(Void result, Stat stat) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(
-                                    "[{}][{}] Updated cursor in meta store 
after previous failure in ledger at position"
-                                    + " {}", ledger.getName(), name, position);
-                        }
-                        mbean.persistToZookeeper(true);
-                        callback.operationComplete();
-                    }
-
-                    @Override
-                    public void operationFailed(MetaStoreException e) {
-                        log.warn("[{}][{}] Failed to update cursor in meta 
store after previous failure in ledger: {}",
-                                ledger.getName(), name, e.getMessage());
-                        mbean.persistToZookeeper(false);
-                        
callback.operationFailed(createManagedLedgerException(rc));
-                    }
-                }, true);
+                // Before giving up, try to persist the position in the 
metadata store.
+                persistPositionMetaStore(mdEntry, callback);
             }
         }, null);
     }
 
+    void persistPositionMetaStore(MarkDeleteEntry mdEntry, final VoidCallback 
callback) {

Review Comment:
   ```suggestion
       void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final 
VoidCallback callback) {
   ```



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -229,6 +229,76 @@ void readTwice() throws Exception {
         entries.forEach(Entry::release);
     }
 
+    @Test
+    void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception 
{
+        ManagedLedgerImpl ml =
+                (ManagedLedgerImpl) factory.open("ml_test", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+        ManagedCursor cursor = ml.openCursor("c1");
+        Position lastEntry = null;
+        for (int i = 0; i < 10; i++) {
+            lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+
+        // Mock cursor ledger create failed.
+        bkc.failNow(BKException.Code.NoBookieAvailableException);
+
+        cursor.markDelete(lastEntry);
+
+        // Assert persist mark deleted position to ZK was successful.
+        PositionImpl slowestReadPosition = 
ml.getCursors().getSlowestReaderPosition();
+        assertTrue(slowestReadPosition.getLedgerId() >= 
lastEntry.getLedgerId());
+        assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+        assertEquals(cursor.getStats().getPersistLedgerSucceed(), 0);
+        assertTrue(cursor.getStats().getPersistZookeeperSucceed() > 0);
+
+        // cleanup.
+        ml.delete();
+    }
+
+    @Test
+    void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception 
{
+        ManagedLedgerImpl ml =
+                (ManagedLedgerImpl) factory.open("ml_test", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        final int entryCount = 10;
+
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor("c1");
+        ArrayList<Position> positions = new ArrayList<>();
+        for (int i = 0; i < entryCount; i++) {
+            positions.add(ml.addEntry(("entry-" + i).getBytes(Encoding)));
+        }
+        // Trigger the cursor ledger creating.
+        cursor.markDelete(positions.get(0));
+        assertTrue(cursor.getStats().getPersistLedgerSucceed() > 0);
+
+        // Mock cursor ledger write failed.
+        bkc.addEntryFailAfter(0, BKException.Code.NoBookieAvailableException);
+        // Trigger a failed writing of the cursor ledger, then wait the stat 
of cursor to be "NoLedger".
+        cursor.markDelete(positions.get(1));
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cursor.getState(), "NoLedger");
+        });
+        assertTrue(cursor.getStats().getPersistLedgerErrors() > 0);
+        long persistZookeeperSucceed1 = 
cursor.getStats().getPersistZookeeperSucceed();
+        assertTrue(persistZookeeperSucceed1 > 0);
+
+        // Mock cursor ledger create failed.
+        bkc.failNow(BKException.Code.NoBookieAvailableException);
+        // Verify the cursor status will be persistent to ZK even if the 
cursor ledger creation always fails.
+        Position lastEntry = positions.get(entryCount -1);
+        cursor.markDelete(lastEntry);
+        long persistZookeeperSucceed2 = 
cursor.getStats().getPersistZookeeperSucceed();
+        assertTrue(persistZookeeperSucceed2 > persistZookeeperSucceed1);
+
+        // Assert persist mark deleted position to ZK was successful.
+        PositionImpl slowestReadPosition = 
ml.getCursors().getSlowestReaderPosition();
+        assertTrue(slowestReadPosition.getLedgerId() >= 
lastEntry.getLedgerId());
+        assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());

Review Comment:
   Same as the above comment



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -229,6 +229,76 @@ void readTwice() throws Exception {
         entries.forEach(Entry::release);
     }
 
+    @Test
+    void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception 
{
+        ManagedLedgerImpl ml =
+                (ManagedLedgerImpl) factory.open("ml_test", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+
+        ManagedCursor cursor = ml.openCursor("c1");
+        Position lastEntry = null;
+        for (int i = 0; i < 10; i++) {
+            lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+
+        // Mock cursor ledger create failed.
+        bkc.failNow(BKException.Code.NoBookieAvailableException);
+
+        cursor.markDelete(lastEntry);
+
+        // Assert persist mark deleted position to ZK was successful.
+        PositionImpl slowestReadPosition = 
ml.getCursors().getSlowestReaderPosition();
+        assertTrue(slowestReadPosition.getLedgerId() >= 
lastEntry.getLedgerId());
+        assertTrue(slowestReadPosition.getEntryId() >= lastEntry.getEntryId());
+        assertEquals(cursor.getStats().getPersistLedgerSucceed(), 0);
+        assertTrue(cursor.getStats().getPersistZookeeperSucceed() > 0);

Review Comment:
   Please add the following lines to make sure the mark delete position can be 
recovered properly.
   
   ```java
           assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
   
           ml.close();
           ml = (ManagedLedgerImpl) factory.open("ml_test", new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1));
           cursor = ml.openCursor("c1");
           assertEquals(cursor.getPersistentMarkDeletedPosition(), lastEntry);
   ```



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -229,6 +229,28 @@ void readTwice() throws Exception {
         entries.forEach(Entry::release);
     }
 
+    @Test
+    void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception 
{

Review Comment:
   Ah, got it. The cursor Ledger ID is updated to -1 after persisting to 
ZooKeeper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to