Copilot commented on code in PR #24739:
URL: https://github.com/apache/pulsar/pull/24739#discussion_r2573064464


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -560,38 +653,75 @@ private synchronized CompletableFuture<Void> 
asyncMergeBucketSnapshot(List<Immut
                                 immutableBuckets.asMapOfRanges()
                                         
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
                             }
+                        } finally {
+                            writeLock.unlock();
                         }
                     });
         });
     }
 
-    @Override
-    public synchronized boolean hasMessageAvailable() {
-        long cutoffTime = getCutoffTime();
+    private void createBucketSnapshotAsync() {
+        final MutableBucket bucketToSeal = this.bucketBeingSealed;
+        if (bucketToSeal == null || bucketToSeal.isEmpty()) {
+            this.bucketBeingSealed = null;
+            return;
+        }
+        try {
+            long createStartTime = System.currentTimeMillis();
+            
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
+            Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
+                    bucketToSeal.sealBucketAndAsyncPersistent(
+                            this.timeStepPerBucketSnapshotSegmentInMillis,
+                            this.maxIndexesPerBucketSnapshotSegment,
+                            this.sharedBucketPriorityQueue);
+            if (immutableBucketDelayedIndexPair == null) {
+                return;
+            }

Review Comment:
   Early return at line 677-678 skips the cleanup in the outer `finally` block. 
If `immutableBucketDelayedIndexPair` is null, the method returns without 
resetting `bucketBeingSealed` via the outer finally block (lines 694-697). This 
could leave `bucketBeingSealed` pointing to an invalid bucket and prevent 
future bucket creation. Consider ensuring `bucketBeingSealed` is properly 
cleaned up before returning.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -143,11 +191,15 @@ public 
BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumer
         this.immutableBuckets = TreeRangeMap.create();
         this.snapshotSegmentLastIndexMap = new ConcurrentHashMap<>();
         this.lastMutableBucket =
-                new MutableBucket(dispatcher.getName(), 
dispatcher.getCursor(), FutureUtil.Sequencer.create(),
+                new MutableBucket(context.getName(), context.getCursor(), 
FutureUtil.Sequencer.create(),
                         bucketSnapshotStorage);
         this.stats = new BucketDelayedMessageIndexStats();
+        ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+        this.readLock = rwLock.readLock();
+        this.writeLock = rwLock.writeLock();
+        bucketSnapshotExecutor = Executors.newSingleThreadScheduledExecutor(
+                new ExecutorProvider.ExtendedThreadFactory("bucket-creation"));

Review Comment:
   The `bucketSnapshotExecutor` uses a single-threaded executor with a generic 
thread name "bucket-creation". Consider using a more descriptive thread name 
that includes the dispatcher/context name (e.g., `context.getName() + 
"-bucket-creation"`) to aid in debugging and thread dumps, especially when 
multiple trackers are running concurrently.
   ```suggestion
                   new ExecutorProvider.ExtendedThreadFactory(context.getName() 
+ "-bucket-creation"));
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -744,26 +860,45 @@ public boolean shouldPauseAllDeliveries() {
     }
 
     @Override
-    public synchronized CompletableFuture<Void> clear() {
-        CompletableFuture<Void> future = cleanImmutableBuckets();
-        sharedBucketPriorityQueue.clear();
-        lastMutableBucket.clear();
-        snapshotSegmentLastIndexMap.clear();
-        numberDelayedMessages.set(0);
-        return future;
+    public CompletableFuture<Void> clear() {
+        writeLock.lock();
+        try {
+            CompletableFuture<Void> future = cleanImmutableBuckets();
+            sharedBucketPriorityQueue.clear();
+            lastMutableBucket.clear();
+            snapshotSegmentLastIndexMap.clear();
+            numberDelayedMessages.set(0);
+            return future;
+        } finally {
+            writeLock.unlock();
+        }
     }
 
     @Override
-    public synchronized void close() {
-        super.close();
-        lastMutableBucket.close();
-        sharedBucketPriorityQueue.close();
+    public void close() {
+        List<CompletableFuture<Long>> completableFutures = 
Collections.emptyList();
+        writeLock.lock();
         try {
-            List<CompletableFuture<Long>> completableFutures = 
immutableBuckets.asMapOfRanges().values().stream()
-                    .map(bucket -> 
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
-            
FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
+            super.close();
+            lastMutableBucket.close();
+            sharedBucketPriorityQueue.close();
+            try {
+                completableFutures = 
immutableBuckets.asMapOfRanges().values().stream()
+                        .map(bucket -> 
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
+            } catch (Exception e) {
+                log.warn("[{}] Failed wait to snapshot generate", 
context.getName(), e);
+            }
+        } finally {
+            writeLock.unlock();
+        }
+        try {
+            if (!completableFutures.isEmpty()) {
+                
FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS);
+            }
         } catch (Exception e) {
-            log.warn("[{}] Failed wait to snapshot generate", 
dispatcher.getName(), e);
+            log.warn("[{}] Failed wait to snapshot generate", 
context.getName(), e);
+        } finally {
+            bucketSnapshotExecutor.shutdown();

Review Comment:
   The `bucketSnapshotExecutor` is shut down in the `finally` block at line 901 
without ensuring that pending snapshot tasks complete. If 
`createBucketSnapshotAsync()` is running when `close()` is called, the executor 
shutdown may occur before the task finishes, potentially causing incomplete 
snapshot operations or resource leaks. Consider calling 
`bucketSnapshotExecutor.shutdownNow()` to interrupt ongoing tasks, or 
`awaitTermination()` to wait for completion before proceeding.
   ```suggestion
               bucketSnapshotExecutor.shutdown();
               try {
                   if 
(!bucketSnapshotExecutor.awaitTermination(AsyncOperationTimeoutSeconds, 
TimeUnit.SECONDS)) {
                       log.warn("[{}] bucketSnapshotExecutor did not terminate 
in the specified time.", context.getName());
                   }
               } catch (InterruptedException ie) {
                   Thread.currentThread().interrupt();
                   log.warn("[{}] Interrupted while waiting for 
bucketSnapshotExecutor to terminate.", context.getName(), ie);
               }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -365,59 +423,85 @@ private void 
afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
     }
 
     @Override
-    public synchronized boolean addMessage(long ledgerId, long entryId, long 
deliverAt) {
-        if (containsMessage(ledgerId, entryId)) {
-            return true;
-        }
+    public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
+        readLock.lock();
+        try {
+            if (containsMessageUnsafe(ledgerId, entryId)) {
+                return true;
+            }
 
-        if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
-            return false;
+            if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
+                return false;
+            }
+        } finally {
+            readLock.unlock();
         }
 
-        boolean existBucket = findImmutableBucket(ledgerId).isPresent();
-
-        // Create bucket snapshot
-        if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
-                && lastMutableBucket.size() >= minIndexCountPerBucket
-                && !lastMutableBucket.isEmpty()) {
-            long createStartTime = System.currentTimeMillis();
-            
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
-            Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
-                    lastMutableBucket.sealBucketAndAsyncPersistent(
-                            this.timeStepPerBucketSnapshotSegmentInMillis,
-                            this.maxIndexesPerBucketSnapshotSegment,
-                            this.sharedBucketPriorityQueue);
-            afterCreateImmutableBucket(immutableBucketDelayedIndexPair, 
createStartTime);
-            lastMutableBucket.resetLastMutableBucketRange();
+        writeLock.lock();
+        try {
+            // Double check
+            if (containsMessageUnsafe(ledgerId, entryId)) {
+                return true;
+            }
 
-            if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > 
maxNumBuckets) {
-                asyncMergeBucketSnapshot();
+            if (deliverAt <= getCutoffTime()) {
+                return false;
             }
-        }
 
-        if (ledgerId < lastMutableBucket.startLedgerId || existBucket) {
-            // If (ledgerId < startLedgerId || existBucket) means that message 
index belong to previous bucket range,
-            // enter sharedBucketPriorityQueue directly
-            sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
-            lastMutableBucket.putIndexBit(ledgerId, entryId);
-        } else {
-            checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
-            lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
-        }
+            final MutableBucket sealingBucket = this.bucketBeingSealed;
+            if (sealingBucket != null
+                    && ledgerId >= sealingBucket.startLedgerId
+                    && ledgerId <= sealingBucket.endLedgerId) {
+                sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
+                sealingBucket.putIndexBit(ledgerId, entryId);
+            } else {
+                boolean existBucket = 
findImmutableBucket(ledgerId).isPresent();
+
+                if (!existBucket
+                        && ledgerId > lastMutableBucket.endLedgerId
+                        && lastMutableBucket.size() >= minIndexCountPerBucket
+                        && !lastMutableBucket.isEmpty()
+                        && bucketSnapshotInProgress.compareAndSet(false, 
true)) {
+                    // Create bucket snapshot
+                    this.bucketBeingSealed = this.lastMutableBucket;
+                    this.lastMutableBucket = new 
MutableBucket(context.getName(), context.getCursor(),
+                            FutureUtil.Sequencer.create(), 
this.lastMutableBucket.getBucketSnapshotStorage());
+                    bucketSnapshotExecutor.execute(() -> {
+                        try {
+                            createBucketSnapshotAsync();
+                        } finally {
+                            bucketSnapshotInProgress.set(false);
+                        }
+                    });
+                }

Review Comment:
   Race condition in bucket snapshot creation. Between setting 
`bucketBeingSealed` at line 466 and starting the async task at line 469, 
another thread with the write lock could call `addMessage` and access 
`bucketBeingSealed` at line 451, potentially leading to inconsistent state. 
Additionally, since `lastMutableBucket` is replaced at lines 467-468, messages 
being added concurrently might be directed to the wrong bucket. Consider 
ensuring that the bucket swap and async task initiation are atomic or that 
concurrent access is properly serialized.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -560,38 +653,75 @@ private synchronized CompletableFuture<Void> 
asyncMergeBucketSnapshot(List<Immut
                                 immutableBuckets.asMapOfRanges()
                                         
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
                             }
+                        } finally {
+                            writeLock.unlock();
                         }
                     });
         });
     }
 
-    @Override
-    public synchronized boolean hasMessageAvailable() {
-        long cutoffTime = getCutoffTime();
+    private void createBucketSnapshotAsync() {
+        final MutableBucket bucketToSeal = this.bucketBeingSealed;
+        if (bucketToSeal == null || bucketToSeal.isEmpty()) {
+            this.bucketBeingSealed = null;
+            return;
+        }
+        try {
+            long createStartTime = System.currentTimeMillis();
+            
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
+            Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
+                    bucketToSeal.sealBucketAndAsyncPersistent(
+                            this.timeStepPerBucketSnapshotSegmentInMillis,
+                            this.maxIndexesPerBucketSnapshotSegment,
+                            this.sharedBucketPriorityQueue);
+            if (immutableBucketDelayedIndexPair == null) {
+                return;
+            }
 
-        boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 && 
nextDeliveryTime() <= cutoffTime;
-        if (!hasMessageAvailable) {
-            updateTimer();
+            writeLock.lock();
+            try {
+                afterCreateImmutableBucket(immutableBucketDelayedIndexPair, 
createStartTime);
+                if (this.bucketBeingSealed == bucketToSeal) {
+                    this.bucketBeingSealed = null;
+                }
+
+                if (maxNumBuckets > 0 && 
immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
+                    asyncMergeBucketSnapshot();
+                }
+            } finally {
+                writeLock.unlock();
+            }
+        } finally {
+            if (this.bucketBeingSealed == bucketToSeal) {
+                this.bucketBeingSealed = null;
+            }

Review Comment:
   Duplicate cleanup logic in the `finally` block at lines 694-697. The 
`bucketBeingSealed` is already set to null at line 685 within the `writeLock`, 
so the outer `finally` block (lines 694-697) will always execute a redundant 
comparison. Additionally, this outer `finally` lacks lock protection, which 
could lead to race conditions when checking and setting `bucketBeingSealed`. 
Consider removing the duplicate logic or ensuring proper synchronization.
   ```suggestion
   
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -560,38 +653,75 @@ private synchronized CompletableFuture<Void> 
asyncMergeBucketSnapshot(List<Immut
                                 immutableBuckets.asMapOfRanges()
                                         
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
                             }
+                        } finally {
+                            writeLock.unlock();
                         }
                     });
         });
     }
 
-    @Override
-    public synchronized boolean hasMessageAvailable() {
-        long cutoffTime = getCutoffTime();
+    private void createBucketSnapshotAsync() {
+        final MutableBucket bucketToSeal = this.bucketBeingSealed;
+        if (bucketToSeal == null || bucketToSeal.isEmpty()) {
+            this.bucketBeingSealed = null;
+            return;
+        }
+        try {
+            long createStartTime = System.currentTimeMillis();
+            
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
+            Pair<ImmutableBucket, DelayedIndex> 
immutableBucketDelayedIndexPair =
+                    bucketToSeal.sealBucketAndAsyncPersistent(
+                            this.timeStepPerBucketSnapshotSegmentInMillis,
+                            this.maxIndexesPerBucketSnapshotSegment,
+                            this.sharedBucketPriorityQueue);
+            if (immutableBucketDelayedIndexPair == null) {
+                return;
+            }
 
-        boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 && 
nextDeliveryTime() <= cutoffTime;
-        if (!hasMessageAvailable) {
-            updateTimer();
+            writeLock.lock();
+            try {
+                afterCreateImmutableBucket(immutableBucketDelayedIndexPair, 
createStartTime);
+                if (this.bucketBeingSealed == bucketToSeal) {
+                    this.bucketBeingSealed = null;
+                }
+
+                if (maxNumBuckets > 0 && 
immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
+                    asyncMergeBucketSnapshot();
+                }
+            } finally {
+                writeLock.unlock();
+            }
+        } finally {
+            if (this.bucketBeingSealed == bucketToSeal) {
+                this.bucketBeingSealed = null;
+            }
         }
-        return hasMessageAvailable;
     }
 
     @Override
-    protected long nextDeliveryTime() {
-        // Use optimistic read for frequently called method
-        long stamp = stampedLock.tryOptimisticRead();
-        long result = nextDeliveryTimeUnsafe();
-
+    public boolean hasMessageAvailable() {
+        writeLock.lock();
+        try {
+            long cutoffTime = getCutoffTime();
 
-        if (!stampedLock.validate(stamp)) {
-            stamp = stampedLock.readLock();
-            try {
-                result = nextDeliveryTimeUnsafe();
-            } finally {
-                stampedLock.unlockRead(stamp);
+            boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 && 
nextDeliveryTimeUnsafe() <= cutoffTime;
+            if (!hasMessageAvailable) {
+                updateTimer();
             }
+            return hasMessageAvailable;
+        } finally {
+            writeLock.unlock();
+        }
+    }

Review Comment:
   The `hasMessageAvailable()` method uses a write lock when it appears to only 
perform read operations (`getNumberOfDelayedMessages()` and 
`nextDeliveryTimeUnsafe()`). This unnecessarily blocks concurrent readers and 
degrades performance. Consider using a read lock instead unless there's a 
specific reason for the write lock here. If write lock is needed due to 
`updateTimer()` side effects, this should be documented.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -560,38 +653,75 @@ private synchronized CompletableFuture<Void> 
asyncMergeBucketSnapshot(List<Immut
                                 immutableBuckets.asMapOfRanges()
                                         
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
                             }
+                        } finally {
+                            writeLock.unlock();
                         }
                     });
         });
     }
 
-    @Override
-    public synchronized boolean hasMessageAvailable() {
-        long cutoffTime = getCutoffTime();
+    private void createBucketSnapshotAsync() {
+        final MutableBucket bucketToSeal = this.bucketBeingSealed;
+        if (bucketToSeal == null || bucketToSeal.isEmpty()) {
+            this.bucketBeingSealed = null;
+            return;
+        }

Review Comment:
   The `bucketBeingSealed` field is accessed and set to null without lock 
protection. In `createBucketSnapshotAsync()` at lines 664-666 and 677, the 
field is read and written outside the `writeLock`. This creates a race 
condition where multiple threads could see `bucketBeingSealed` as null 
concurrently, potentially causing issues with the bucket sealing logic. The 
field should be declared as `volatile` and/or all accesses should be protected 
by the `writeLock`.



##########
microbench/src/main/java/org/apache/pulsar/broker/package-info.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Microbenchmarks for delayed message delivery bucket implementation.
+ *
+ * <p>This package contains JMH benchmarks for testing the performance
+ * characteristics of the BucketDelayedDeliveryTracker, particularly
+ * focusing on thread safety improvements with StampedLock optimistic reads.

Review Comment:
   Documentation mentions "StampedLock optimistic reads" but the implementation 
now uses ReentrantReadWriteLock according to the PR description and code 
changes. Update the documentation to reflect the actual implementation: 
"focusing on thread safety improvements with ReentrantReadWriteLock".
   ```suggestion
    * focusing on thread safety improvements with ReentrantReadWriteLock.
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java:
##########
@@ -430,4 +537,486 @@ public void testOptimisticReadPerformance() throws 
Exception {
             assertTrue(throughput > 10000, "Should achieve at least 10K 
ops/sec with " + numThreads + " threads");
         }
     }
+
+    /**
+     * Test concurrent getScheduledMessages() calls with read operations.
+     * getScheduledMessages() uses write lock while read operations use read 
lock.
+     * Messages are added beforehand to avoid concurrent addMessage calls.
+     */
+    @Test
+    public void testConcurrentGetScheduledMessagesWithReads() throws Exception 
{
+        // Add messages that will be ready for delivery after a short delay
+        final long baseTime = System.currentTimeMillis();
+        final int totalMessages = 500;
+
+        // Add messages with delivery time slightly in the future, then wait 
for them to become ready
+        for (int i = 0; i < totalMessages; i++) {
+            tracker.addMessage(i, i, baseTime + 1000);
+        }
+        assertEquals(tracker.getNumberOfDelayedMessages(), totalMessages, "All 
messages should be added");
+        // Wait for messages to become ready for delivery
+        Thread.sleep(3000);
+        final int numReadThreads = 12;
+        final int numScheduleThreads = 4;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+        final CountDownLatch schedulersDone = new 
CountDownLatch(numScheduleThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+        final AtomicInteger totalMessagesRetrieved = new AtomicInteger(0);
+        // Start read threads (containsMessage and nextDeliveryTime)
+        for (int i = 0; i < numReadThreads; i++) {
+            final int threadId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < 1000; j++) {
+                        if (threadId % 2 == 0) {
+                            tracker.containsMessage(j % totalMessages, j % 
totalMessages);
+                        } else {
+                            try {
+                                tracker.nextDeliveryTime();
+                            } catch (IllegalArgumentException e) {
+                                // Expected when no messages available
+                            }
+                        }
+                        if (j % 100 == 0) {
+                            Thread.sleep(1);
+                        }
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    readersDone.countDown();
+                }
+            });
+        }
+        // Start getScheduledMessages threads - continue until all messages 
are retrieved
+        for (int i = 0; i < numScheduleThreads; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    int consecutiveEmptyReturns = 0;
+                    final int maxConsecutiveEmpty = 5;
+
+                    while (totalMessagesRetrieved.get() < totalMessages
+                            && consecutiveEmptyReturns < maxConsecutiveEmpty) {
+                        NavigableSet<Position> messages = 
tracker.getScheduledMessages(50);
+                        int retrieved = messages.size();
+                        totalMessagesRetrieved.addAndGet(retrieved);
+
+                        if (retrieved == 0) {
+                            consecutiveEmptyReturns++;
+                            Thread.sleep(10);
+                        } else {
+                            consecutiveEmptyReturns = 0;
+                            Thread.sleep(5);
+                        }
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    schedulersDone.countDown();
+                }
+            });
+        }
+        startLatch.countDown();
+        assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should 
complete within 30 seconds");
+        assertTrue(schedulersDone.await(30, TimeUnit.SECONDS), "Schedulers 
should complete within 30 seconds");
+        if (errors.get() > 0) {
+            Exception exception = firstException.get();
+            if (exception != null) {
+                throw new AssertionError("Concurrent getScheduledMessages test 
failed", exception);
+            }
+        }
+        assertEquals(errors.get(), 0, "No exceptions should occur during 
concurrent operations");
+
+        // Verify that most or all messages were retrieved
+        assertEquals(totalMessagesRetrieved.get(), 500, "All messages should 
be retrieved");
+
+        log.info("Total messages retrieved: {} out of {}", 
totalMessagesRetrieved.get(), totalMessages);
+    }
+
+    /**
+     * Test concurrent clear() operations with read operations.
+     * This verifies that clear() properly coordinates with ongoing read 
operations.
+     * Messages are added beforehand, then clear() is tested with concurrent 
reads.
+     */
+    @Test
+    public void testConcurrentClearWithReads() throws Exception {
+        final int initialMessages = 1000;
+        final int numReadThreads = 10;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+        final AtomicBoolean clearCompleted = new AtomicBoolean(false);
+        // Add initial messages (single thread)
+        for (int i = 0; i < initialMessages; i++) {
+            tracker.addMessage(i, i, System.currentTimeMillis() + 60000);
+        }
+        // Start read threads that will run during clear operation
+        for (int i = 0; i < numReadThreads; i++) {
+            final int threadId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    while (!clearCompleted.get()) {
+                        switch (threadId % 3) {
+                            case 0:
+                                tracker.containsMessage(threadId, threadId);
+                                break;
+                            case 1:
+                                try {
+                                    tracker.nextDeliveryTime();
+                                } catch (IllegalArgumentException e) {
+                                    // Expected when no messages available
+                                }
+                                break;
+                            case 2:
+                                tracker.getNumberOfDelayedMessages();
+                                break;
+                        }
+                        Thread.sleep(1);
+                    }
+                    // Continue reading for a bit after clear
+                    for (int j = 0; j < 100; j++) {
+                        tracker.containsMessage(j, j);
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    readersDone.countDown();
+                }
+            });
+        }
+        // Start clear operation after a short delay
+        executorService.submit(() -> {
+            try {
+                startLatch.await();
+                Thread.sleep(100);
+                tracker.clear().get(30, TimeUnit.SECONDS);
+                clearCompleted.set(true);
+            } catch (Exception e) {
+                errors.incrementAndGet();
+                firstException.compareAndSet(null, e);
+                e.printStackTrace();
+                clearCompleted.set(true);
+            }
+        });
+        startLatch.countDown();
+        assertTrue(readersDone.await(60, TimeUnit.SECONDS), "Readers should 
complete within 60 seconds");
+        if (errors.get() > 0) {
+            Exception exception = firstException.get();
+            if (exception != null) {
+                throw new AssertionError("Concurrent clear test failed", 
exception);
+            }
+        }
+        assertEquals(errors.get(), 0, "No exceptions should occur during 
concurrent clear operations");
+        assertEquals(tracker.getNumberOfDelayedMessages(), 0, "All messages 
should be cleared");
+    }
+
+    /**
+     * Test concurrent close() operations.
+     * This verifies that close() properly handles concurrent access and shuts 
down cleanly.
+     * Messages are added beforehand to test close() behavior with existing 
data.
+     */
+    @Test
+    public void testConcurrentClose() throws Exception {
+        final int numReadThreads = 8;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+        final AtomicBoolean closeInitiated = new AtomicBoolean(false);
+        // Add some messages first (single thread)
+        for (int i = 0; i < 100; i++) {
+            tracker.addMessage(i, i, System.currentTimeMillis() + 60000);
+        }
+        // Start read threads that will be interrupted by close
+        for (int i = 0; i < numReadThreads; i++) {
+            final int threadId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    while (!closeInitiated.get()) {
+                        try {
+                            switch (threadId % 4) {
+                                case 0:
+                                    tracker.containsMessage(threadId, 
threadId);
+                                    break;
+                                case 1:
+                                    tracker.nextDeliveryTime();
+                                    break;
+                                case 2:
+                                    tracker.getNumberOfDelayedMessages();
+                                    break;
+                                case 3:
+                                    tracker.getScheduledMessages(10);
+                                    break;
+                            }
+                        } catch (IllegalArgumentException e) {
+                            // Expected for some operations when tracker is 
being closed
+                        }
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    // Some exceptions may be expected during close
+                    if (!closeInitiated.get()) {
+                        errors.incrementAndGet();
+                        firstException.compareAndSet(null, e);
+                        e.printStackTrace();
+                    }
+                } finally {
+                    readersDone.countDown();
+                }
+            });
+        }
+        // Start close operation after a short delay
+        executorService.submit(() -> {
+            try {
+                startLatch.await();
+                Thread.sleep(100);
+                closeInitiated.set(true);
+                tracker.close();
+            } catch (Exception e) {
+                errors.incrementAndGet();
+                firstException.compareAndSet(null, e);
+                e.printStackTrace();
+            }
+        });
+        startLatch.countDown();
+        assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should 
complete within 30 seconds");
+        if (errors.get() > 0) {
+            Exception exception = firstException.get();
+            if (exception != null) {
+                log.warn("Exception during concurrent close test (may be 
expected): " + exception.getMessage());
+            }
+        }
+        // Create a new tracker for the next test since this one is closed
+        tracker = new BucketDelayedDeliveryTracker(
+                dispatcher, timer, 1000, Clock.systemUTC(), true, storage,
+                100, 1000, 100, 10
+        );
+    }
+
+    /**
+     * Test mixed read operations with sequential addMessage and concurrent 
getScheduledMessages.
+     * This tests the ReentrantReadWriteLock behavior when read and write 
operations are mixed.
+     * addMessage is executed in single thread, while reads and 
getScheduledMessages are concurrent.
+     * Ensures all deliverable messages are retrieved before test completion.
+     */
+    @Test
+    public void testMixedReadWriteOperationsDeadlockDetection() throws 
Exception {
+        final int numReadThreads = 16;
+        final int numScheduleThreads = 4;
+        final int totalMessages = 2000;
+        final int readsPerThread = 500;
+
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+        final CountDownLatch schedulersDone = new 
CountDownLatch(numScheduleThreads);
+        final CountDownLatch writerDone = new CountDownLatch(1);
+        final AtomicBoolean deadlockDetected = new AtomicBoolean(false);
+        final AtomicInteger completedOperations = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+        final AtomicInteger messagesAdded = new AtomicInteger(0);
+        final AtomicInteger deliverableMessagesCount = new AtomicInteger(0);
+        final AtomicInteger totalMessagesRetrieved = new AtomicInteger(0);
+        // Single writer thread for addMessage (sequential execution)
+        executorService.submit(() -> {
+            try {
+                startLatch.await();
+                final long baseTime = System.currentTimeMillis();
+
+                for (int i = 0; i < totalMessages; i++) {
+                    try {
+                        long ledgerId = 10000 + i;
+                        long entryId = i % 1000;
+
+                        // Create mix of messages: some ready for delivery, 
some delayed
+                        long deliverAt;
+                        if (i % 3 == 0) {
+                            // Messages that will be ready for delivery after 
a short delay
+                            deliverAt = baseTime + 500;
+                            deliverableMessagesCount.incrementAndGet();
+                        } else {
+                            // Messages for future delivery (much later)
+                            deliverAt = baseTime + 30000;
+                        }
+
+                        boolean added = tracker.addMessage(ledgerId, entryId, 
deliverAt);
+                        if (added) {
+                            messagesAdded.incrementAndGet();
+                        }
+                        completedOperations.incrementAndGet();
+
+                        if (i % 200 == 0) {
+                            Thread.sleep(1);
+                        }
+                    } catch (Exception e) {
+                        deadlockDetected.set(true);
+                        firstException.compareAndSet(null, e);
+                        e.printStackTrace();
+                        break;
+                    }
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                deadlockDetected.set(true);
+            } catch (Exception e) {
+                deadlockDetected.set(true);
+                firstException.compareAndSet(null, e);
+                e.printStackTrace();
+            } finally {
+                writerDone.countDown();
+            }
+        });
+        // Start read threads (using read locks)
+        for (int i = 0; i < numReadThreads; i++) {
+            final int threadId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    // Continue reading until writer is done, plus some extra 
operations
+                    int operationCount = 0;
+                    while ((writerDone.getCount() > 0 || operationCount < 
readsPerThread)) {
+                        try {
+                            switch (threadId % 3) {
+                                case 0:
+                                    long ledgerId = 10000 + (operationCount % 
(totalMessages + 100));
+                                    long entryId = operationCount % 1000;
+                                    tracker.containsMessage(ledgerId, entryId);
+                                    break;
+                                case 1:
+                                    tracker.nextDeliveryTime();
+                                    break;
+                                case 2:
+                                    tracker.getNumberOfDelayedMessages();
+                                    break;
+                            }
+                            completedOperations.incrementAndGet();
+                            operationCount++;
+                            if (operationCount % 100 == 0) {
+                                Thread.sleep(1);
+                            }
+                        } catch (IllegalArgumentException e) {
+                            // Expected for some operations
+                            completedOperations.incrementAndGet();
+                            operationCount++;
+                        } catch (Exception e) {
+                            deadlockDetected.set(true);
+                            firstException.compareAndSet(null, e);
+                            e.printStackTrace();
+                            break;
+                        }
+                        if (writerDone.getCount() == 0 && operationCount >= 
readsPerThread) {
+                            break;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    deadlockDetected.set(true);
+                } catch (Exception e) {
+                    deadlockDetected.set(true);
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    readersDone.countDown();
+                }
+            });
+        }
+        // Start getScheduledMessages threads (using write locks)
+        for (int i = 0; i < numScheduleThreads; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+
+                    // Wait for writer to finish and messages to become 
deliverable
+                    writerDone.await();
+                    Thread.sleep(1000); // Wait 1 second for messages to 
become ready for delivery
+
+                    int consecutiveEmptyReturns = 0;
+                    final int maxConsecutiveEmpty = 5;
+
+                    // Continue until we've retrieved all deliverable messages 
or hit max empty returns
+                    while (totalMessagesRetrieved.get() < 
deliverableMessagesCount.get()
+                            && consecutiveEmptyReturns < maxConsecutiveEmpty) {
+                        try {
+                            NavigableSet<Position> messages = 
tracker.getScheduledMessages(50);
+                            int retrieved = messages.size();
+                            totalMessagesRetrieved.addAndGet(retrieved);
+                            completedOperations.incrementAndGet();
+
+                            if (retrieved == 0) {
+                                consecutiveEmptyReturns++;
+                                Thread.sleep(5); // Short wait for more 
messages
+                            } else {
+                                consecutiveEmptyReturns = 0;
+                                Thread.sleep(2); // Short processing delay
+                            }
+
+                        } catch (Exception e) {
+                            deadlockDetected.set(true);
+                            firstException.compareAndSet(null, e);
+                            e.printStackTrace();
+                            break;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    deadlockDetected.set(true);
+                } catch (Exception e) {
+                    deadlockDetected.set(true);
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    schedulersDone.countDown();
+                }
+            });
+        }
+        // Start all threads
+        startLatch.countDown();
+
+        // Wait for completion with reasonable timeout to detect deadlocks
+        boolean writerCompleted = writerDone.await(10, TimeUnit.SECONDS);
+        boolean readersCompleted = readersDone.await(15, TimeUnit.SECONDS);
+        boolean schedulersCompleted = schedulersDone.await(20, 
TimeUnit.SECONDS);
+        if (!writerCompleted || !readersCompleted || !schedulersCompleted) {
+            deadlockDetected.set(true);
+            log.error("Test timed out - potential deadlock detected. Writer: 
{}, Readers: {}, Schedulers: {}",
+                    writerCompleted, readersCompleted, schedulersCompleted);
+        }
+        if (deadlockDetected.get()) {
+            Exception e = firstException.get();
+            if (e != null) {
+                throw new AssertionError("Deadlock or exception detected 
during mixed operations test", e);
+            } else {
+                throw new AssertionError("Deadlock detected - test did not 
complete within timeout");
+            }
+        }
+
+        // Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+        //         assertEquals(deliverableMessagesCount.get(), 
totalMessages));

Review Comment:
   [nitpick] Commented-out code at lines 1006-1007 should be removed if not 
needed. If this code serves as documentation or is planned for future use, add 
a comment explaining why it's retained.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to