Copilot commented on code in PR #25104:
URL: https://github.com/apache/pulsar/pull/25104#discussion_r2642322114


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -204,6 +206,81 @@ public void testOpenCursorWithNullInitialPosition() throws 
Exception {
         assertEquals(cursor.getMarkDeletedPosition(), 
ledger.getLastConfirmedEntry());
     }
 
+    @Test
+    public void testConcurrentPropertyOperationsThreadSafety() throws 
Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        ManagedLedger ledger = 
factory.open("testConcurrentPropertyOperationsThreadSafety", config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor("c_concurrent", null);
+
+        int threadCount = 100;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+        //Stress test with random operations for 5 seconds

Review Comment:
   Missing space after comment marker. Should be "// Stress test" instead of 
"//Stress test".
   ```suggestion
           // Stress test with random operations for 5 seconds
   ```



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -204,6 +206,81 @@ public void testOpenCursorWithNullInitialPosition() throws 
Exception {
         assertEquals(cursor.getMarkDeletedPosition(), 
ledger.getLastConfirmedEntry());
     }
 
+    @Test
+    public void testConcurrentPropertyOperationsThreadSafety() throws 
Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        ManagedLedger ledger = 
factory.open("testConcurrentPropertyOperationsThreadSafety", config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor("c_concurrent", null);
+
+        int threadCount = 100;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+        //Stress test with random operations for 5 seconds
+        long testEndTime = System.currentTimeMillis() + 5000;
+        AtomicLong totalOperations = new AtomicLong(0);
+        AtomicLong exceptionCount = new AtomicLong(0);
+        AtomicBoolean inconsistencyDetected = new AtomicBoolean(false);
+
+        ConcurrentHashMap<String, Long> records = new ConcurrentHashMap<>();
+        while (System.currentTimeMillis() < testEndTime) {
+            executor.submit(() -> {
+                try {
+                    totalOperations.incrementAndGet();
+                    Random random = new Random();
+                    int operationType = random.nextInt(3);
+                    String randomKey = "key" + random.nextInt(20);
+
+                    switch (operationType) {
+                        case 0: // Put operation
+                            Long randomValue = random.nextLong();
+                            cursor.putProperty(randomKey, randomValue);
+                            records.put(randomKey, randomValue);
+                            break;
+
+                        case 1: // Remove operation
+                            cursor.removeProperty(randomKey);
+                            records.remove(randomKey);
+                            break;
+
+                        case 2: // Read and verify operation
+                            Map<String, Long> properties = 
cursor.getProperties();
+                            // Verify no inconsistent state (key exists but 
value is null)
+                            if (properties.containsKey(randomKey) && 
properties.get(randomKey) == null) {
+                                inconsistencyDetected.set(true);
+                                fail("INCONSISTENT STATE DETECTED: Key '" + 
randomKey + "' exists but has null value");
+                            }
+                            break;
+                    }
+                } catch (ConcurrentModificationException cme) {
+                    exceptionCount.incrementAndGet();
+                } catch (Exception e) {
+                    exceptionCount.incrementAndGet();
+                    fail("Unexpected exception: " + e.getMessage());
+                }
+            });
+        }
+
+        executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+
+        // Then: Verify test results
+        // 1. No ConcurrentModificationException should occur with fixed code
+        assertEquals(exceptionCount.get(), 0, "No exceptions should occur with 
thread-safe implementation");
+
+        // 2. No inconsistent states detected
+        assertFalse(inconsistencyDetected.get(), "No inconsistent states (key 
with null value) should be detected");
+
+        // 3. Verify property data
+        Map<String, Long> finalProperties = cursor.getProperties();
+        assertEquals(records.size(), finalProperties.size());
+        try {
+            for (String key : finalProperties.keySet()) {
+                assertEquals(records.get(key), finalProperties.get(key), 
"Values should match for key: " + key);
+            }
+        } catch (Exception e) {
+            fail("HashMap corruption detected: " + e.getMessage());

Review Comment:
   This test has a race condition that undermines its reliability. The main 
thread submits tasks to the executor in a tight loop while checking the time, 
but immediately after the loop exits, it shuts down the executor and verifies 
the final state. However, many tasks may still be queued or executing when 
verification begins. Additionally, the parallel tracking using the 'records' 
ConcurrentHashMap is fundamentally flawed because operations on 'cursor' and 
'records' are not atomic as a pair - between a cursor.putProperty and 
records.put, or between cursor.removeProperty and records.remove, other threads 
can interleave operations, making the final assertion (lines 275-279) 
unreliable.
   
   The test should either:
   1. Collect all Future objects from executor.submit() calls and wait for them 
to complete before verification, or
   2. Remove the 'records' tracking map and only verify that no exceptions 
occur and that the cursor's internal state is consistent (no null values for 
existing keys)
   ```suggestion
           // 3. Verify cursor property data is internally consistent (no null 
keys or values)
           Map<String, Long> finalProperties = cursor.getProperties();
           for (Map.Entry<String, Long> entry : finalProperties.entrySet()) {
               assertNotNull(entry.getKey(), "Property key should not be null");
               assertNotNull(entry.getValue(), "Property value should not be 
null for key: " + entry.getKey());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to