This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5bb18eeca3349cc4dbe09d9d07b863227e1b9770 Author: Ruimin MA <[email protected]> AuthorDate: Fri Jan 2 22:38:59 2026 +0800 [improve][broker] Fix thread safety issue in ManagedCursorImpl.removeProperty (#25104) (cherry picked from commit 68d377937bc4e3265e7858abab630ff39969f774) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 9 +- .../bookkeeper/mledger/impl/ManagedCursorTest.java | 114 +++++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 397aee49efc..26df4118251 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -523,7 +523,14 @@ public class ManagedCursorImpl implements ManagedCursor { LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> { Map<String, Long> properties = last.properties; if (properties != null && properties.containsKey(key)) { - properties.remove(key); + Map<String, Long> newProperties = new HashMap<>(properties); + newProperties.remove(key); + + MarkDeleteEntry newLastMarkDeleteEntry = new MarkDeleteEntry(last.newPosition, newProperties, + last.callback, last.ctx); + newLastMarkDeleteEntry.callbackGroup = last.callbackGroup; + + return newLastMarkDeleteEntry; } return last; }); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 7523934ec10..5e367e303d9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; @@ -57,6 +58,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -72,6 +74,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -206,6 +209,117 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(cursor.getMarkDeletedPosition(), ledger.getLastConfirmedEntry()); } + @Test + public void testConcurrentPropertyOperationsThreadSafety() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + ManagedLedger ledger = factory.open("testConcurrentPropertyOperationsThreadSafety", config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c_concurrent", null); + + int threadCount = 100; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + + // Collect all Future objects to ensure all tasks complete before verification + List<Future<?>> allFutures = new ArrayList<>(); + + // Use fixed number of operations + int totalOperations = 10000; + AtomicLong completedOperations = new AtomicLong(0); + AtomicLong exceptionCount = new AtomicLong(0); + AtomicBoolean inconsistencyDetected = new AtomicBoolean(false); + + // Submit fixed number of concurrent tasks + for (int i = 0; i < totalOperations; i++) { + Future<?> future = executor.submit(() -> { + try { + Random random = new Random(); + int operationType = random.nextInt(3); + String randomKey = "key" + random.nextInt(20); + + switch (operationType) { + case 0: // Put operation + Long randomValue = random.nextLong(); + cursor.putProperty(randomKey, randomValue); + break; + + case 1: // Remove operation + cursor.removeProperty(randomKey); + break; + + case 2: // Read and verify operation + Map<String, Long> properties = cursor.getProperties(); + // Verify no inconsistent state (key exists but value is null) + if (properties.containsKey(randomKey) && properties.get(randomKey) == null) { + inconsistencyDetected.set(true); + fail("INCONSISTENT STATE DETECTED: Key '" + randomKey + "' exists but has null value"); + } + break; + } + completedOperations.incrementAndGet(); + + } catch (ConcurrentModificationException cme) { + // Record ConcurrentModificationException but don't fail immediately + // We'll assert at the end that no exceptions occurred + exceptionCount.incrementAndGet(); + } catch (Exception e) { + exceptionCount.incrementAndGet(); + fail("Unexpected exception: " + e.getMessage()); + } + }); + + allFutures.add(future); + } + executor.shutdown(); + + // Wait for each task to complete with timeout + for (Future<?> future : allFutures) { + try { + future.get(30, TimeUnit.SECONDS); + } catch (TimeoutException e) { + fail("Task timed out after 30 seconds - possible deadlock or infinite loop"); + } catch (ExecutionException e) { + fail("unexpected exception: " + e.getCause()); + } + } + + // Ensure executor is fully terminated + boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS); + assertTrue(terminated, "Executor should be fully terminated"); + + // Then: Verify test results + // 1. No ConcurrentModificationException should occur with fixed code + assertEquals(exceptionCount.get(), 0, "No exceptions should occur with thread-safe implementation"); + + // 2. No inconsistent states detected + assertFalse(inconsistencyDetected.get(), "No inconsistent states (key with null value) should be detected"); + + // 3: Final cursor state should be internally consistent + Map<String, Long> finalProperties = cursor.getProperties(); + try { + for (Map.Entry<String, Long> entry : finalProperties.entrySet()) { + String key = entry.getKey(); + Long value = entry.getValue(); + // Verify key is not null + assertNotNull(key, "Final key should not be null"); + // Verify value is not null + assertNotNull(value, "Final value should not be null for key: " + key); + // Verify key follows expected pattern + assertTrue(key.startsWith("key"), + "Final key should start with 'key', got: " + key); + + // Verify key format is valid (key followed by a number 0-19) + try { + String numberPart = key.substring(3); // Remove "key" prefix + int keyNumber = Integer.parseInt(numberPart); + assertTrue(keyNumber >= 0 && keyNumber < 20, + "Key number should be between 0 and 19, got: " + keyNumber); + } catch (NumberFormatException e) { + fail("Invalid key format: " + key + ". Should be 'keyX' where X is a number"); + } + } + } catch (Exception e) { + fail("HashMap corruption detected in final state: " + e.getMessage()); + } + } private static void closeCursorLedger(ManagedCursorImpl managedCursor) { Awaitility.await().until(managedCursor::closeCursorLedger); }
