This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6f74686e8f27ba4ef56ff05e8f3fdc83b3447426 Author: Lari Hotari <[email protected]> AuthorDate: Tue Apr 5 22:44:25 2022 +0300 [ML] Fix race condition in updating lastMarkDeleteEntry field (#15031) - missed updates can lead to the subscription and consuming getting stuck (cherry picked from commit ad2f397fb9bb1d6f90a980e68f0161ed32900309) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 31 ++++++++++++++++++---- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index cb4847ba679..87fe0b94c26 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1652,7 +1652,7 @@ public class ManagedCursorImpl implements ManagedCursor { // Apply rate limiting to mark-delete operations if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { isDirty = true; - lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, null, null); + updateLastMarkDeleteEntryToLatest(newPosition, properties); callback.markDeleteComplete(ctx); return; } @@ -1705,7 +1705,14 @@ public class ManagedCursorImpl implements ManagedCursor { // ledger is postponed to when the counter goes to 0. PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.incrementAndGet(this); - lastMarkDeleteEntry = mdEntry; + LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { + if (last != null && last.newPosition.compareTo(mdEntry.newPosition) > 0) { + // keep the current value since it's later then the mdEntry.newPosition + return last; + } else { + return mdEntry; + } + }); persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() { @Override @@ -1962,9 +1969,7 @@ public class ManagedCursorImpl implements ManagedCursor { // Apply rate limiting to mark-delete operations if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { isDirty = true; - PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition; - LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, - last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null)); + updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null); callback.deleteComplete(ctx); return; } @@ -1996,6 +2001,22 @@ public class ManagedCursorImpl implements ManagedCursor { } } + // update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition + private void updateLastMarkDeleteEntryToLatest(final PositionImpl newPosition, + final Map<String, Long> properties) { + LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { + if (last != null && last.newPosition.compareTo(newPosition) > 0) { + // keep current value, don't update + return last; + } else { + // use given properties or when missing, use the properties from the previous field value + Map<String, Long> propertiesToUse = + properties != null ? properties : (last != null ? last.properties : Collections.emptyMap()); + return new MarkDeleteEntry(newPosition, propertiesToUse, null, null); + } + }); + } + /** * Given a list of entries, filter out the entries that have already been individually deleted. *
