poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205216997


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we 
do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, 
but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also 
work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the 
autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), 
name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 
1);
+                    positionsToAck = new ArrayList<>();
+                }

Review Comment:
   > I mean the asyncDelete method is also doing the same thing.
   > - Iterate the positions then update the memory state one by one.
   > - The mark delete rate limiter will update the changes to the storage 
layer in batch
   
   I think you are right, but the mechanism provided by the rate limiter can 
not save the logic(calculate the new markDeletePosition) below
   - 
https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2313-L2328
   - 
https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L1819-L1884
   
   Since the event ledger loss rarely happens, I feel your suggestion is better.
   
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to