This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c9099267a85660a02e61f67594640df67c01f0fd
Author: 道君- Tao Jiuming <[email protected]>
AuthorDate: Fri May 23 09:38:45 2025 +0800

    [fix][ml] Fix ManagedCursorImpl.individualDeletedMessages concurrent issue 
(#24338)
    
    (cherry picked from commit 376ae57e52a3d9557ff26f5c184e0bb79b22fb37)
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java  | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ca10c10b83a..b1ebc097492 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -631,6 +631,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
         } else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) {
             List<LongListMap> rangeList = 
positionInfo.getIndividualDeletedMessageRangesList();
+            lock.writeLock().lock();
             try {
                 Map<Long, long[]> rangeMap = 
rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
                         list -> list.getValuesList().stream().mapToLong(i -> 
i).toArray()));
@@ -653,6 +654,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             } catch (Exception e) {
                 log.warn("[{}]-{} Failed to recover individualDeletedMessages 
from serialized data", ledger.getName(),
                         name, e);
+            } finally {
+                lock.writeLock().unlock();
             }
         }
     }
@@ -3200,10 +3203,13 @@ public class ManagedCursorImpl implements ManagedCursor 
{
          * and deserialization error.
          */
         if (getConfig().isUnackedRangesOpenCacheSetEnabled() && 
getConfig().isPersistIndividualAckAsLongArray()) {
+            lock.readLock().lock();
             try {
                 internalRanges = 
individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
             } catch (Exception e) {
                 log.warn("[{}]-{} Failed to serialize 
individualDeletedMessages", ledger.getName(), name, e);
+            } finally {
+                lock.readLock().unlock();
             }
         }
         if (internalRanges != null && !internalRanges.isEmpty()) {

Reply via email to