This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 68d377937bc [improve][broker] Fix thread safety issue in
ManagedCursorImpl.removeProperty (#25104)
68d377937bc is described below
commit 68d377937bc4e3265e7858abab630ff39969f774
Author: Ruimin MA <[email protected]>
AuthorDate: Fri Jan 2 22:38:59 2026 +0800
[improve][broker] Fix thread safety issue in
ManagedCursorImpl.removeProperty (#25104)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 9 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 114 +++++++++++++++++++++
2 files changed, 122 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index c0f29ea5c22..ab15a6d6a17 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -524,7 +524,14 @@ public class ManagedCursorImpl implements ManagedCursor {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
Map<String, Long> properties = last.properties;
if (properties != null && properties.containsKey(key)) {
- properties.remove(key);
+ Map<String, Long> newProperties = new
HashMap<>(properties);
+ newProperties.remove(key);
+
+ MarkDeleteEntry newLastMarkDeleteEntry = new
MarkDeleteEntry(last.newPosition, newProperties,
+ last.callback, last.ctx);
+ newLastMarkDeleteEntry.callbackGroup = last.callbackGroup;
+
+ return newLastMarkDeleteEntry;
}
return last;
});
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 123a5cf048d..c87477a95a0 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -50,6 +50,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
+import java.util.ConcurrentModificationException;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -57,6 +58,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -72,6 +74,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -208,6 +211,117 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
assertEquals(cursor.getMarkDeletedPosition(),
ledger.getLastConfirmedEntry());
}
+ @Test
+ public void testConcurrentPropertyOperationsThreadSafety() throws
Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ ManagedLedger ledger =
factory.open("testConcurrentPropertyOperationsThreadSafety", config);
+ ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor("c_concurrent", null);
+
+ int threadCount = 100;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ // Collect all Future objects to ensure all tasks complete before
verification
+ List<Future<?>> allFutures = new ArrayList<>();
+
+ // Use fixed number of operations
+ int totalOperations = 10000;
+ AtomicLong completedOperations = new AtomicLong(0);
+ AtomicLong exceptionCount = new AtomicLong(0);
+ AtomicBoolean inconsistencyDetected = new AtomicBoolean(false);
+
+ // Submit fixed number of concurrent tasks
+ for (int i = 0; i < totalOperations; i++) {
+ Future<?> future = executor.submit(() -> {
+ try {
+ Random random = new Random();
+ int operationType = random.nextInt(3);
+ String randomKey = "key" + random.nextInt(20);
+
+ switch (operationType) {
+ case 0: // Put operation
+ Long randomValue = random.nextLong();
+ cursor.putProperty(randomKey, randomValue);
+ break;
+
+ case 1: // Remove operation
+ cursor.removeProperty(randomKey);
+ break;
+
+ case 2: // Read and verify operation
+ Map<String, Long> properties =
cursor.getProperties();
+ // Verify no inconsistent state (key exists but
value is null)
+ if (properties.containsKey(randomKey) &&
properties.get(randomKey) == null) {
+ inconsistencyDetected.set(true);
+ fail("INCONSISTENT STATE DETECTED: Key '" +
randomKey + "' exists but has null value");
+ }
+ break;
+ }
+ completedOperations.incrementAndGet();
+
+ } catch (ConcurrentModificationException cme) {
+ // Record ConcurrentModificationException but don't fail
immediately
+ // We'll assert at the end that no exceptions occurred
+ exceptionCount.incrementAndGet();
+ } catch (Exception e) {
+ exceptionCount.incrementAndGet();
+ fail("Unexpected exception: " + e.getMessage());
+ }
+ });
+
+ allFutures.add(future);
+ }
+ executor.shutdown();
+
+ // Wait for each task to complete with timeout
+ for (Future<?> future : allFutures) {
+ try {
+ future.get(30, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fail("Task timed out after 30 seconds - possible deadlock or
infinite loop");
+ } catch (ExecutionException e) {
+ fail("unexpected exception: " + e.getCause());
+ }
+ }
+
+ // Ensure executor is fully terminated
+ boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
+ assertTrue(terminated, "Executor should be fully terminated");
+
+ // Then: Verify test results
+ // 1. No ConcurrentModificationException should occur with fixed code
+ assertEquals(exceptionCount.get(), 0, "No exceptions should occur with
thread-safe implementation");
+
+ // 2. No inconsistent states detected
+ assertFalse(inconsistencyDetected.get(), "No inconsistent states (key
with null value) should be detected");
+
+ // 3: Final cursor state should be internally consistent
+ Map<String, Long> finalProperties = cursor.getProperties();
+ try {
+ for (Map.Entry<String, Long> entry : finalProperties.entrySet()) {
+ String key = entry.getKey();
+ Long value = entry.getValue();
+ // Verify key is not null
+ assertNotNull(key, "Final key should not be null");
+ // Verify value is not null
+ assertNotNull(value, "Final value should not be null for key:
" + key);
+ // Verify key follows expected pattern
+ assertTrue(key.startsWith("key"),
+ "Final key should start with 'key', got: " + key);
+
+ // Verify key format is valid (key followed by a number 0-19)
+ try {
+ String numberPart = key.substring(3); // Remove "key"
prefix
+ int keyNumber = Integer.parseInt(numberPart);
+ assertTrue(keyNumber >= 0 && keyNumber < 20,
+ "Key number should be between 0 and 19, got: " +
keyNumber);
+ } catch (NumberFormatException e) {
+ fail("Invalid key format: " + key + ". Should be 'keyX'
where X is a number");
+ }
+ }
+ } catch (Exception e) {
+ fail("HashMap corruption detected in final state: " +
e.getMessage());
+ }
+ }
private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
Awaitility.await().until(managedCursor::closeCursorLedger);
}