codelipenghui commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205099793


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -786,6 +786,12 @@ Set<? extends Position> asyncReplayEntries(
      */
     long getEstimatedSizeSinceMarkDeletePosition();
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled 
"autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    void noticeNonRecoverableLedgerSkipped(long ledgerId);

Review Comment:
   We have the following annotations on this file
   
   ```
   @InterfaceAudience.LimitedPrivate
   @InterfaceStability.Stable
   ```
   
   So we should not break the existing implementations.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java:
##########
@@ -631,6 +631,12 @@ void asyncSetProperties(Map<String, String> properties, 
AsyncCallbacks.UpdatePro
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled 
"autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    void noticeToCursorNonRecoverableLedgerSkipped(long ledgerId);

Review Comment:
   Same as the above comment.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -786,6 +786,12 @@ Set<? extends Position> asyncReplayEntries(
      */
     long getEstimatedSizeSinceMarkDeletePosition();
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled 
"autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    void noticeNonRecoverableLedgerSkipped(long ledgerId);

Review Comment:
   Maybe we can just use `skipNonRecoverableLedger(long ledgerId)`;



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we 
do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, 
but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also 
work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the 
autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), 
name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 
1);
+                    positionsToAck = new ArrayList<>();
+                }

Review Comment:
   The managed cursor already has an entry delete limiter, can we call 
`asyncDelete` directly?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1741,6 +1741,15 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    @Override
+    public void noticeToCursorNonRecoverableLedgerSkipped(long ledgerId){
+        Iterator<ManagedCursor> managedCursorIterator = cursors.iterator();
+        while (managedCursorIterator.hasNext()){
+            ManagedCursor managedCursor = managedCursorIterator.next();
+            managedCursor.noticeNonRecoverableLedgerSkipped(ledgerId);
+        }

Review Comment:
   ```suggestion
           for (ManagedCursor managedCursor : cursors) {
               managedCursor.noticeNonRecoverableLedgerSkipped(ledgerId);
           }
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we 
do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, 
but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also 
work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the 
autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), 
name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 
1);
+                    positionsToAck = new ArrayList<>();
+                }
+            }
+            // Acknowledge the last segments.
+            if (!positionsToAck.isEmpty()) {
+                retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void retryToAcknowledgeNonRecoverablePositions(List<Position> 
positions, int retryTimes) {
+        if (CollectionUtils.isEmpty(positions)) {
+            return;
+        }
+        asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                // ignore.
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException ex, Object ctx) {
+                if (retryTimes <= 3) {
+                    log.warn("[{}] [{}] Try to acknowledge the non recoverable 
positions fail and it will be retry"
+                                    + " after 60s. ledgerId: {}, the current 
retry times: {}",
+                            ledger.getName(), name, 
positions.get(0).getLedgerId(), retryTimes, ex);
+                    ledger.getScheduledExecutor()
+                            .schedule(() -> 
retryToAcknowledgeNonRecoverablePositions(positions, retryTimes + 1),
+                                    60, TimeUnit.SECONDS);
+                } else {
+                    log.error("[{}] [{}] Try to acknowledge the non 
recoverable positions ultimately failed."
+                                    + " ledgerId: {}, retry times: {}",
+                            ledger.getName(), name, 
positions.get(0).getLedgerId(), retryTimes, ex);
+                }
+            }
+        }, retryTimes);

Review Comment:
   The method `internalMarkDelete` already handled the failure operation, why 
do we need to handle it again?
   We only need to make sure the memory state is updated. The failure of ack 
state persistence is fine, right? Because the subsequent ack operations will 
try again.
   
   If the broker crashed, the non-recoverable ledger will be detected again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to