codelipenghui commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205193799


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we 
do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, 
but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also 
work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the 
autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), 
name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 
1);
+                    positionsToAck = new ArrayList<>();
+                }

Review Comment:
   I mean the `asyncDelete` method is also doing the same thing. 
   
   - Iterate the positions then update the memory state one by one.
   - The mark delete rate limiter will update the changes to the storage layer 
in batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to