This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 895e96853e5 [improve][ml] Avoid repetitive nested lock for 
isMessageDeleted in ManagedCursorImpl (#23609)
895e96853e5 is described below

commit 895e96853e56b7d9c10c6bd0f3bd6105b664eb14
Author: sinan liu <[email protected]>
AuthorDate: Wed Nov 20 00:24:46 2024 +0800

    [improve][ml] Avoid repetitive nested lock for isMessageDeleted in 
ManagedCursorImpl (#23609)
---
 .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java  | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7c0d13108b1..478c6a1b379 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1569,7 +1569,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
         lock.readLock().lock();
         try {
-            
positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
+            
positions.stream().filter(this::internalIsMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
         } finally {
             lock.readLock().unlock();
         }
@@ -2345,7 +2345,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     return;
                 }
 
-                if (isMessageDeleted(position)) {
+                if (internalIsMessageDeleted(position)) {
                     if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
                         BitSetRecyclable bitSetRecyclable = 
batchDeletedIndexes.remove(position);
                         if (bitSetRecyclable != null) {
@@ -3543,13 +3543,19 @@ public class ManagedCursorImpl implements ManagedCursor 
{
     public boolean isMessageDeleted(Position position) {
         lock.readLock().lock();
         try {
-            return position.compareTo(markDeletePosition) <= 0
-                    || 
individualDeletedMessages.contains(position.getLedgerId(), 
position.getEntryId());
+            return internalIsMessageDeleted(position);
         } finally {
             lock.readLock().unlock();
         }
     }
 
+    // When this method is called while the external has already acquired a 
write lock or a read lock,
+    // it avoids unnecessary lock nesting.
+    private boolean internalIsMessageDeleted(Position position) {
+        return position.compareTo(markDeletePosition) <= 0
+                || individualDeletedMessages.contains(position.getLedgerId(), 
position.getEntryId());
+    }
+
     //this method will return a copy of the position's ack set
     @Override
     public long[] getBatchPositionAckSet(Position position) {

Reply via email to