This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new bb6f8360834 [fix][broker] Fix deadlock while skip non-recoverable 
ledgers. (#21915)
bb6f8360834 is described below

commit bb6f83608346b2a9b6abe5b4660fb7626493d176
Author: hrzzzz <[email protected]>
AuthorDate: Mon Jan 22 14:23:53 2024 +0800

    [fix][broker] Fix deadlock while skip non-recoverable ledgers. (#21915)
    
    ### Motivation
    The sequence of events leading to the deadlock when methods from 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl are invoked concurrently 
is as follows:
    
    1. Thread A calls asyncDelete, which then goes on to internally call 
internalAsyncMarkDelete. This results in acquiring a lock on 
pendingMarkDeleteOps through synchronized (pendingMarkDeleteOps).
    
    2. Inside internalAsyncMarkDelete, internalMarkDelete is called which 
subsequently calls persistPositionToLedger. At the start of 
persistPositionToLedger, buildIndividualDeletedMessageRanges is invoked, where 
it tries to acquire a read lock using lock.readLock().lock(). At this point, if 
the write lock is being held by another thread, Thread A will block waiting for 
the read lock.
    
    3. Concurrently, Thread B executes skipNonRecoverableLedger which first 
obtains a write lock using lock.writeLock().lock() and then proceeds to call 
asyncDelete.
    
    4. At this moment, Thread B already holds the write lock and is attempting 
to acquire the synchronized lock on pendingMarkDeleteOps that Thread A already 
holds, while Thread A is waiting for the read lock that Thread B needs to 
release.
    
    In code, the deadlock appears as follows:
    
    Thread A: synchronized (pendingMarkDeleteOps) -> lock.readLock().lock() 
(waiting)
    Thread B: lock.writeLock().lock() -> synchronized (pendingMarkDeleteOps) 
(waiting)
    
    ### Modifications
    
    Avoid using a long-range lock.
    
    Co-authored-by: ruihongzhou <[email protected]>
    Co-authored-by: Jiwe Guo <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 36 +++++++++-------------
 1 file changed, 15 insertions(+), 21 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4b65d62f0ee..d78a28dd165 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.LongStream;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -2784,30 +2785,23 @@ public class ManagedCursorImpl implements ManagedCursor 
{
         if (ledgerInfo == null) {
             return;
         }
-        lock.writeLock().lock();
         log.warn("[{}] [{}] Since the ledger [{}] is lost and the 
autoSkipNonRecoverableData is true, this ledger will"
                 + " be auto acknowledge in subscription", ledger.getName(), 
name, ledgerId);
-        try {
-            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
-                if (!individualDeletedMessages.contains(ledgerId, i)) {
-                    asyncDelete(PositionImpl.get(ledgerId, i), new 
AsyncCallbacks.DeleteCallback() {
-                        @Override
-                        public void deleteComplete(Object ctx) {
-                            // ignore.
-                        }
+        asyncDelete(() -> LongStream.range(0, ledgerInfo.getEntries())
+                        .mapToObj(i -> (Position) PositionImpl.get(ledgerId, 
i)).iterator(),
+                new AsyncCallbacks.DeleteCallback() {
+                    @Override
+                    public void deleteComplete(Object ctx) {
+                        // ignore.
+                    }
 
-                        @Override
-                        public void deleteFailed(ManagedLedgerException ex, 
Object ctx) {
-                            // The method internalMarkDelete already handled 
the failure operation. We only need to
-                            // make sure the memory state is updated.
-                            // If the broker crashed, the non-recoverable 
ledger will be detected again.
-                        }
-                    }, null);
-                }
-            }
-        } finally {
-            lock.writeLock().unlock();
-        }
+                    @Override
+                    public void deleteFailed(ManagedLedgerException ex, Object 
ctx) {
+                        // The method internalMarkDelete already handled the 
failure operation. We only need to
+                        // make sure the memory state is updated.
+                        // If the broker crashed, the non-recoverable ledger 
will be detected again.
+                    }
+                }, null);
     }
 
     // //////////////////////////////////////////////////

Reply via email to