This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c9099267a85660a02e61f67594640df67c01f0fd Author: 道君- Tao Jiuming <[email protected]> AuthorDate: Fri May 23 09:38:45 2025 +0800 [fix][ml] Fix ManagedCursorImpl.individualDeletedMessages concurrent issue (#24338) (cherry picked from commit 376ae57e52a3d9557ff26f5c184e0bb79b22fb37) --- .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index ca10c10b83a..b1ebc097492 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -631,6 +631,7 @@ public class ManagedCursorImpl implements ManagedCursor { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) { List<LongListMap> rangeList = positionInfo.getIndividualDeletedMessageRangesList(); + lock.writeLock().lock(); try { Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey, list -> list.getValuesList().stream().mapToLong(i -> i).toArray())); @@ -653,6 +654,8 @@ public class ManagedCursorImpl implements ManagedCursor { } catch (Exception e) { log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(), name, e); + } finally { + lock.writeLock().unlock(); } } } @@ -3200,10 +3203,13 @@ public class ManagedCursorImpl implements ManagedCursor { * and deserialization error. */ if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) { + lock.readLock().lock(); try { internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist()); } catch (Exception e) { log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e); + } finally { + lock.readLock().unlock(); } } if (internalRanges != null && !internalRanges.isEmpty()) {
