poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205161925


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we 
do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, 
but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also 
work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the 
autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), 
name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 
1);
+                    positionsToAck = new ArrayList<>();
+                }
+            }
+            // Acknowledge the last segments.
+            if (!positionsToAck.isEmpty()) {
+                retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void retryToAcknowledgeNonRecoverablePositions(List<Position> 
positions, int retryTimes) {
+        if (CollectionUtils.isEmpty(positions)) {
+            return;
+        }
+        asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                // ignore.
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException ex, Object ctx) {
+                if (retryTimes <= 3) {
+                    log.warn("[{}] [{}] Try to acknowledge the non recoverable 
positions fail and it will be retry"
+                                    + " after 60s. ledgerId: {}, the current 
retry times: {}",
+                            ledger.getName(), name, 
positions.get(0).getLedgerId(), retryTimes, ex);
+                    ledger.getScheduledExecutor()
+                            .schedule(() -> 
retryToAcknowledgeNonRecoverablePositions(positions, retryTimes + 1),
+                                    60, TimeUnit.SECONDS);
+                } else {
+                    log.error("[{}] [{}] Try to acknowledge the non 
recoverable positions ultimately failed."
+                                    + " ledgerId: {}, retry times: {}",
+                            ledger.getName(), name, 
positions.get(0).getLedgerId(), retryTimes, ex);
+                }
+            }
+        }, retryTimes);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to