poorbarcode commented on code in PR #18620: URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205161925
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java: ########## @@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) { } } + /** + * Manually acknowledge all entries in the lost ledger. + * - Since this is an uncommon event, we focus on maintainability. So we do not modify + * {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call + * {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}. + * - This method is valid regardless of the consumer ACK type. + * - If there is a consumer ack request after this event, it will also work. + */ + @Override + public void noticeNonRecoverableLedgerSkipped(final long ledgerId){ + LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId); + if (ledgerInfo == null) { + return; + } + lock.writeLock().lock(); + log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will" + + " be auto acknowledge in subscription", ledger.getName(), name, ledgerId); + try { + List<Position> positionsToAck = new ArrayList<>(); + for (int i = 0; i < ledgerInfo.getEntries(); i++) { + if (!individualDeletedMessages.contains(ledgerId, i)) { + positionsToAck.add(PositionImpl.get(ledgerId, i)); + } + // Acknowledge in segments to avoid OOM. + if (positionsToAck.size() >= 1000) { + retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1); + positionsToAck = new ArrayList<>(); + } + } + // Acknowledge the last segments. + if (!positionsToAck.isEmpty()) { + retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1); + } + } finally { + lock.writeLock().unlock(); + } + } + + private void retryToAcknowledgeNonRecoverablePositions(List<Position> positions, int retryTimes) { + if (CollectionUtils.isEmpty(positions)) { + return; + } + asyncDelete(positions, new AsyncCallbacks.DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + // ignore. + } + + @Override + public void deleteFailed(ManagedLedgerException ex, Object ctx) { + if (retryTimes <= 3) { + log.warn("[{}] [{}] Try to acknowledge the non recoverable positions fail and it will be retry" + + " after 60s. ledgerId: {}, the current retry times: {}", + ledger.getName(), name, positions.get(0).getLedgerId(), retryTimes, ex); + ledger.getScheduledExecutor() + .schedule(() -> retryToAcknowledgeNonRecoverablePositions(positions, retryTimes + 1), + 60, TimeUnit.SECONDS); + } else { + log.error("[{}] [{}] Try to acknowledge the non recoverable positions ultimately failed." + + " ledgerId: {}, retry times: {}", + ledger.getName(), name, positions.get(0).getLedgerId(), retryTimes, ex); + } + } + }, retryTimes); Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org