This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new bb6f8360834 [fix][broker] Fix deadlock while skip non-recoverable
ledgers. (#21915)
bb6f8360834 is described below
commit bb6f83608346b2a9b6abe5b4660fb7626493d176
Author: hrzzzz <[email protected]>
AuthorDate: Mon Jan 22 14:23:53 2024 +0800
[fix][broker] Fix deadlock while skip non-recoverable ledgers. (#21915)
### Motivation
The sequence of events leading to the deadlock when methods from
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl are invoked concurrently
is as follows:
1. Thread A calls asyncDelete, which then goes on to internally call
internalAsyncMarkDelete. This results in acquiring a lock on
pendingMarkDeleteOps through synchronized (pendingMarkDeleteOps).
2. Inside internalAsyncMarkDelete, internalMarkDelete is called which
subsequently calls persistPositionToLedger. At the start of
persistPositionToLedger, buildIndividualDeletedMessageRanges is invoked, where
it tries to acquire a read lock using lock.readLock().lock(). At this point, if
the write lock is being held by another thread, Thread A will block waiting for
the read lock.
3. Concurrently, Thread B executes skipNonRecoverableLedger which first
obtains a write lock using lock.writeLock().lock() and then proceeds to call
asyncDelete.
4. At this moment, Thread B already holds the write lock and is attempting
to acquire the synchronized lock on pendingMarkDeleteOps that Thread A already
holds, while Thread A is waiting for the read lock that Thread B needs to
release.
In code, the deadlock appears as follows:
Thread A: synchronized (pendingMarkDeleteOps) -> lock.readLock().lock()
(waiting)
Thread B: lock.writeLock().lock() -> synchronized (pendingMarkDeleteOps)
(waiting)
### Modifications
Avoid using a long-range lock.
Co-authored-by: ruihongzhou <[email protected]>
Co-authored-by: Jiwe Guo <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 36 +++++++++-------------
1 file changed, 15 insertions(+), 21 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4b65d62f0ee..d78a28dd165 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -59,6 +59,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.stream.LongStream;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
@@ -2784,30 +2785,23 @@ public class ManagedCursorImpl implements ManagedCursor
{
if (ledgerInfo == null) {
return;
}
- lock.writeLock().lock();
log.warn("[{}] [{}] Since the ledger [{}] is lost and the
autoSkipNonRecoverableData is true, this ledger will"
+ " be auto acknowledge in subscription", ledger.getName(),
name, ledgerId);
- try {
- for (int i = 0; i < ledgerInfo.getEntries(); i++) {
- if (!individualDeletedMessages.contains(ledgerId, i)) {
- asyncDelete(PositionImpl.get(ledgerId, i), new
AsyncCallbacks.DeleteCallback() {
- @Override
- public void deleteComplete(Object ctx) {
- // ignore.
- }
+ asyncDelete(() -> LongStream.range(0, ledgerInfo.getEntries())
+ .mapToObj(i -> (Position) PositionImpl.get(ledgerId,
i)).iterator(),
+ new AsyncCallbacks.DeleteCallback() {
+ @Override
+ public void deleteComplete(Object ctx) {
+ // ignore.
+ }
- @Override
- public void deleteFailed(ManagedLedgerException ex,
Object ctx) {
- // The method internalMarkDelete already handled
the failure operation. We only need to
- // make sure the memory state is updated.
- // If the broker crashed, the non-recoverable
ledger will be detected again.
- }
- }, null);
- }
- }
- } finally {
- lock.writeLock().unlock();
- }
+ @Override
+ public void deleteFailed(ManagedLedgerException ex, Object
ctx) {
+ // The method internalMarkDelete already handled the
failure operation. We only need to
+ // make sure the memory state is updated.
+ // If the broker crashed, the non-recoverable ledger
will be detected again.
+ }
+ }, null);
}
// //////////////////////////////////////////////////