Copilot commented on code in PR #24739:
URL: https://github.com/apache/pulsar/pull/24739#discussion_r2573064464
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -560,38 +653,75 @@ private synchronized CompletableFuture<Void>
asyncMergeBucketSnapshot(List<Immut
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
}
+ } finally {
+ writeLock.unlock();
}
});
});
}
- @Override
- public synchronized boolean hasMessageAvailable() {
- long cutoffTime = getCutoffTime();
+ private void createBucketSnapshotAsync() {
+ final MutableBucket bucketToSeal = this.bucketBeingSealed;
+ if (bucketToSeal == null || bucketToSeal.isEmpty()) {
+ this.bucketBeingSealed = null;
+ return;
+ }
+ try {
+ long createStartTime = System.currentTimeMillis();
+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
+ Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
+ bucketToSeal.sealBucketAndAsyncPersistent(
+ this.timeStepPerBucketSnapshotSegmentInMillis,
+ this.maxIndexesPerBucketSnapshotSegment,
+ this.sharedBucketPriorityQueue);
+ if (immutableBucketDelayedIndexPair == null) {
+ return;
+ }
Review Comment:
Early return at line 677-678 skips the cleanup in the outer `finally` block.
If `immutableBucketDelayedIndexPair` is null, the method returns without
resetting `bucketBeingSealed` via the outer finally block (lines 694-697). This
could leave `bucketBeingSealed` pointing to an invalid bucket and prevent
future bucket creation. Consider ensuring `bucketBeingSealed` is properly
cleaned up before returning.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -143,11 +191,15 @@ public
BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumer
this.immutableBuckets = TreeRangeMap.create();
this.snapshotSegmentLastIndexMap = new ConcurrentHashMap<>();
this.lastMutableBucket =
- new MutableBucket(dispatcher.getName(),
dispatcher.getCursor(), FutureUtil.Sequencer.create(),
+ new MutableBucket(context.getName(), context.getCursor(),
FutureUtil.Sequencer.create(),
bucketSnapshotStorage);
this.stats = new BucketDelayedMessageIndexStats();
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ this.readLock = rwLock.readLock();
+ this.writeLock = rwLock.writeLock();
+ bucketSnapshotExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ExecutorProvider.ExtendedThreadFactory("bucket-creation"));
Review Comment:
The `bucketSnapshotExecutor` uses a single-threaded executor with a generic
thread name "bucket-creation". Consider using a more descriptive thread name
that includes the dispatcher/context name (e.g., `context.getName() +
"-bucket-creation"`) to aid in debugging and thread dumps, especially when
multiple trackers are running concurrently.
```suggestion
new ExecutorProvider.ExtendedThreadFactory(context.getName()
+ "-bucket-creation"));
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -744,26 +860,45 @@ public boolean shouldPauseAllDeliveries() {
}
@Override
- public synchronized CompletableFuture<Void> clear() {
- CompletableFuture<Void> future = cleanImmutableBuckets();
- sharedBucketPriorityQueue.clear();
- lastMutableBucket.clear();
- snapshotSegmentLastIndexMap.clear();
- numberDelayedMessages.set(0);
- return future;
+ public CompletableFuture<Void> clear() {
+ writeLock.lock();
+ try {
+ CompletableFuture<Void> future = cleanImmutableBuckets();
+ sharedBucketPriorityQueue.clear();
+ lastMutableBucket.clear();
+ snapshotSegmentLastIndexMap.clear();
+ numberDelayedMessages.set(0);
+ return future;
+ } finally {
+ writeLock.unlock();
+ }
}
@Override
- public synchronized void close() {
- super.close();
- lastMutableBucket.close();
- sharedBucketPriorityQueue.close();
+ public void close() {
+ List<CompletableFuture<Long>> completableFutures =
Collections.emptyList();
+ writeLock.lock();
try {
- List<CompletableFuture<Long>> completableFutures =
immutableBuckets.asMapOfRanges().values().stream()
- .map(bucket ->
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
-
FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS);
+ super.close();
+ lastMutableBucket.close();
+ sharedBucketPriorityQueue.close();
+ try {
+ completableFutures =
immutableBuckets.asMapOfRanges().values().stream()
+ .map(bucket ->
bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
+ } catch (Exception e) {
+ log.warn("[{}] Failed wait to snapshot generate",
context.getName(), e);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ try {
+ if (!completableFutures.isEmpty()) {
+
FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS);
+ }
} catch (Exception e) {
- log.warn("[{}] Failed wait to snapshot generate",
dispatcher.getName(), e);
+ log.warn("[{}] Failed wait to snapshot generate",
context.getName(), e);
+ } finally {
+ bucketSnapshotExecutor.shutdown();
Review Comment:
The `bucketSnapshotExecutor` is shut down in the `finally` block at line 901
without ensuring that pending snapshot tasks complete. If
`createBucketSnapshotAsync()` is running when `close()` is called, the executor
shutdown may occur before the task finishes, potentially causing incomplete
snapshot operations or resource leaks. Consider calling
`bucketSnapshotExecutor.shutdownNow()` to interrupt ongoing tasks, or
`awaitTermination()` to wait for completion before proceeding.
```suggestion
bucketSnapshotExecutor.shutdown();
try {
if
(!bucketSnapshotExecutor.awaitTermination(AsyncOperationTimeoutSeconds,
TimeUnit.SECONDS)) {
log.warn("[{}] bucketSnapshotExecutor did not terminate
in the specified time.", context.getName());
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.warn("[{}] Interrupted while waiting for
bucketSnapshotExecutor to terminate.", context.getName(), ie);
}
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -365,59 +423,85 @@ private void
afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
}
@Override
- public synchronized boolean addMessage(long ledgerId, long entryId, long
deliverAt) {
- if (containsMessage(ledgerId, entryId)) {
- return true;
- }
+ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
+ readLock.lock();
+ try {
+ if (containsMessageUnsafe(ledgerId, entryId)) {
+ return true;
+ }
- if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
- return false;
+ if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
+ return false;
+ }
+ } finally {
+ readLock.unlock();
}
- boolean existBucket = findImmutableBucket(ledgerId).isPresent();
-
- // Create bucket snapshot
- if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
- && lastMutableBucket.size() >= minIndexCountPerBucket
- && !lastMutableBucket.isEmpty()) {
- long createStartTime = System.currentTimeMillis();
-
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
- Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
- lastMutableBucket.sealBucketAndAsyncPersistent(
- this.timeStepPerBucketSnapshotSegmentInMillis,
- this.maxIndexesPerBucketSnapshotSegment,
- this.sharedBucketPriorityQueue);
- afterCreateImmutableBucket(immutableBucketDelayedIndexPair,
createStartTime);
- lastMutableBucket.resetLastMutableBucketRange();
+ writeLock.lock();
+ try {
+ // Double check
+ if (containsMessageUnsafe(ledgerId, entryId)) {
+ return true;
+ }
- if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() >
maxNumBuckets) {
- asyncMergeBucketSnapshot();
+ if (deliverAt <= getCutoffTime()) {
+ return false;
}
- }
- if (ledgerId < lastMutableBucket.startLedgerId || existBucket) {
- // If (ledgerId < startLedgerId || existBucket) means that message
index belong to previous bucket range,
- // enter sharedBucketPriorityQueue directly
- sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
- lastMutableBucket.putIndexBit(ledgerId, entryId);
- } else {
- checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
- lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
- }
+ final MutableBucket sealingBucket = this.bucketBeingSealed;
+ if (sealingBucket != null
+ && ledgerId >= sealingBucket.startLedgerId
+ && ledgerId <= sealingBucket.endLedgerId) {
+ sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
+ sealingBucket.putIndexBit(ledgerId, entryId);
+ } else {
+ boolean existBucket =
findImmutableBucket(ledgerId).isPresent();
+
+ if (!existBucket
+ && ledgerId > lastMutableBucket.endLedgerId
+ && lastMutableBucket.size() >= minIndexCountPerBucket
+ && !lastMutableBucket.isEmpty()
+ && bucketSnapshotInProgress.compareAndSet(false,
true)) {
+ // Create bucket snapshot
+ this.bucketBeingSealed = this.lastMutableBucket;
+ this.lastMutableBucket = new
MutableBucket(context.getName(), context.getCursor(),
+ FutureUtil.Sequencer.create(),
this.lastMutableBucket.getBucketSnapshotStorage());
+ bucketSnapshotExecutor.execute(() -> {
+ try {
+ createBucketSnapshotAsync();
+ } finally {
+ bucketSnapshotInProgress.set(false);
+ }
+ });
+ }
Review Comment:
Race condition in bucket snapshot creation. Between setting
`bucketBeingSealed` at line 466 and starting the async task at line 469,
another thread with the write lock could call `addMessage` and access
`bucketBeingSealed` at line 451, potentially leading to inconsistent state.
Additionally, since `lastMutableBucket` is replaced at lines 467-468, messages
being added concurrently might be directed to the wrong bucket. Consider
ensuring that the bucket swap and async task initiation are atomic or that
concurrent access is properly serialized.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -560,38 +653,75 @@ private synchronized CompletableFuture<Void>
asyncMergeBucketSnapshot(List<Immut
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
}
+ } finally {
+ writeLock.unlock();
}
});
});
}
- @Override
- public synchronized boolean hasMessageAvailable() {
- long cutoffTime = getCutoffTime();
+ private void createBucketSnapshotAsync() {
+ final MutableBucket bucketToSeal = this.bucketBeingSealed;
+ if (bucketToSeal == null || bucketToSeal.isEmpty()) {
+ this.bucketBeingSealed = null;
+ return;
+ }
+ try {
+ long createStartTime = System.currentTimeMillis();
+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
+ Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
+ bucketToSeal.sealBucketAndAsyncPersistent(
+ this.timeStepPerBucketSnapshotSegmentInMillis,
+ this.maxIndexesPerBucketSnapshotSegment,
+ this.sharedBucketPriorityQueue);
+ if (immutableBucketDelayedIndexPair == null) {
+ return;
+ }
- boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 &&
nextDeliveryTime() <= cutoffTime;
- if (!hasMessageAvailable) {
- updateTimer();
+ writeLock.lock();
+ try {
+ afterCreateImmutableBucket(immutableBucketDelayedIndexPair,
createStartTime);
+ if (this.bucketBeingSealed == bucketToSeal) {
+ this.bucketBeingSealed = null;
+ }
+
+ if (maxNumBuckets > 0 &&
immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
+ asyncMergeBucketSnapshot();
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ } finally {
+ if (this.bucketBeingSealed == bucketToSeal) {
+ this.bucketBeingSealed = null;
+ }
Review Comment:
Duplicate cleanup logic in the `finally` block at lines 694-697. The
`bucketBeingSealed` is already set to null at line 685 within the `writeLock`,
so the outer `finally` block (lines 694-697) will always execute a redundant
comparison. Additionally, this outer `finally` lacks lock protection, which
could lead to race conditions when checking and setting `bucketBeingSealed`.
Consider removing the duplicate logic or ensuring proper synchronization.
```suggestion
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -560,38 +653,75 @@ private synchronized CompletableFuture<Void>
asyncMergeBucketSnapshot(List<Immut
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
}
+ } finally {
+ writeLock.unlock();
}
});
});
}
- @Override
- public synchronized boolean hasMessageAvailable() {
- long cutoffTime = getCutoffTime();
+ private void createBucketSnapshotAsync() {
+ final MutableBucket bucketToSeal = this.bucketBeingSealed;
+ if (bucketToSeal == null || bucketToSeal.isEmpty()) {
+ this.bucketBeingSealed = null;
+ return;
+ }
+ try {
+ long createStartTime = System.currentTimeMillis();
+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
+ Pair<ImmutableBucket, DelayedIndex>
immutableBucketDelayedIndexPair =
+ bucketToSeal.sealBucketAndAsyncPersistent(
+ this.timeStepPerBucketSnapshotSegmentInMillis,
+ this.maxIndexesPerBucketSnapshotSegment,
+ this.sharedBucketPriorityQueue);
+ if (immutableBucketDelayedIndexPair == null) {
+ return;
+ }
- boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 &&
nextDeliveryTime() <= cutoffTime;
- if (!hasMessageAvailable) {
- updateTimer();
+ writeLock.lock();
+ try {
+ afterCreateImmutableBucket(immutableBucketDelayedIndexPair,
createStartTime);
+ if (this.bucketBeingSealed == bucketToSeal) {
+ this.bucketBeingSealed = null;
+ }
+
+ if (maxNumBuckets > 0 &&
immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
+ asyncMergeBucketSnapshot();
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ } finally {
+ if (this.bucketBeingSealed == bucketToSeal) {
+ this.bucketBeingSealed = null;
+ }
}
- return hasMessageAvailable;
}
@Override
- protected long nextDeliveryTime() {
- // Use optimistic read for frequently called method
- long stamp = stampedLock.tryOptimisticRead();
- long result = nextDeliveryTimeUnsafe();
-
+ public boolean hasMessageAvailable() {
+ writeLock.lock();
+ try {
+ long cutoffTime = getCutoffTime();
- if (!stampedLock.validate(stamp)) {
- stamp = stampedLock.readLock();
- try {
- result = nextDeliveryTimeUnsafe();
- } finally {
- stampedLock.unlockRead(stamp);
+ boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 &&
nextDeliveryTimeUnsafe() <= cutoffTime;
+ if (!hasMessageAvailable) {
+ updateTimer();
}
+ return hasMessageAvailable;
+ } finally {
+ writeLock.unlock();
+ }
+ }
Review Comment:
The `hasMessageAvailable()` method uses a write lock when it appears to only
perform read operations (`getNumberOfDelayedMessages()` and
`nextDeliveryTimeUnsafe()`). This unnecessarily blocks concurrent readers and
degrades performance. Consider using a read lock instead unless there's a
specific reason for the write lock here. If write lock is needed due to
`updateTimer()` side effects, this should be documented.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java:
##########
@@ -560,38 +653,75 @@ private synchronized CompletableFuture<Void>
asyncMergeBucketSnapshot(List<Immut
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
}
+ } finally {
+ writeLock.unlock();
}
});
});
}
- @Override
- public synchronized boolean hasMessageAvailable() {
- long cutoffTime = getCutoffTime();
+ private void createBucketSnapshotAsync() {
+ final MutableBucket bucketToSeal = this.bucketBeingSealed;
+ if (bucketToSeal == null || bucketToSeal.isEmpty()) {
+ this.bucketBeingSealed = null;
+ return;
+ }
Review Comment:
The `bucketBeingSealed` field is accessed and set to null without lock
protection. In `createBucketSnapshotAsync()` at lines 664-666 and 677, the
field is read and written outside the `writeLock`. This creates a race
condition where multiple threads could see `bucketBeingSealed` as null
concurrently, potentially causing issues with the bucket sealing logic. The
field should be declared as `volatile` and/or all accesses should be protected
by the `writeLock`.
##########
microbench/src/main/java/org/apache/pulsar/broker/package-info.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Microbenchmarks for delayed message delivery bucket implementation.
+ *
+ * <p>This package contains JMH benchmarks for testing the performance
+ * characteristics of the BucketDelayedDeliveryTracker, particularly
+ * focusing on thread safety improvements with StampedLock optimistic reads.
Review Comment:
Documentation mentions "StampedLock optimistic reads" but the implementation
now uses ReentrantReadWriteLock according to the PR description and code
changes. Update the documentation to reflect the actual implementation:
"focusing on thread safety improvements with ReentrantReadWriteLock".
```suggestion
* focusing on thread safety improvements with ReentrantReadWriteLock.
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java:
##########
@@ -430,4 +537,486 @@ public void testOptimisticReadPerformance() throws
Exception {
assertTrue(throughput > 10000, "Should achieve at least 10K
ops/sec with " + numThreads + " threads");
}
}
+
+ /**
+ * Test concurrent getScheduledMessages() calls with read operations.
+ * getScheduledMessages() uses write lock while read operations use read
lock.
+ * Messages are added beforehand to avoid concurrent addMessage calls.
+ */
+ @Test
+ public void testConcurrentGetScheduledMessagesWithReads() throws Exception
{
+ // Add messages that will be ready for delivery after a short delay
+ final long baseTime = System.currentTimeMillis();
+ final int totalMessages = 500;
+
+ // Add messages with delivery time slightly in the future, then wait
for them to become ready
+ for (int i = 0; i < totalMessages; i++) {
+ tracker.addMessage(i, i, baseTime + 1000);
+ }
+ assertEquals(tracker.getNumberOfDelayedMessages(), totalMessages, "All
messages should be added");
+ // Wait for messages to become ready for delivery
+ Thread.sleep(3000);
+ final int numReadThreads = 12;
+ final int numScheduleThreads = 4;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final CountDownLatch schedulersDone = new
CountDownLatch(numScheduleThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+ final AtomicInteger totalMessagesRetrieved = new AtomicInteger(0);
+ // Start read threads (containsMessage and nextDeliveryTime)
+ for (int i = 0; i < numReadThreads; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < 1000; j++) {
+ if (threadId % 2 == 0) {
+ tracker.containsMessage(j % totalMessages, j %
totalMessages);
+ } else {
+ try {
+ tracker.nextDeliveryTime();
+ } catch (IllegalArgumentException e) {
+ // Expected when no messages available
+ }
+ }
+ if (j % 100 == 0) {
+ Thread.sleep(1);
+ }
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ readersDone.countDown();
+ }
+ });
+ }
+ // Start getScheduledMessages threads - continue until all messages
are retrieved
+ for (int i = 0; i < numScheduleThreads; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ int consecutiveEmptyReturns = 0;
+ final int maxConsecutiveEmpty = 5;
+
+ while (totalMessagesRetrieved.get() < totalMessages
+ && consecutiveEmptyReturns < maxConsecutiveEmpty) {
+ NavigableSet<Position> messages =
tracker.getScheduledMessages(50);
+ int retrieved = messages.size();
+ totalMessagesRetrieved.addAndGet(retrieved);
+
+ if (retrieved == 0) {
+ consecutiveEmptyReturns++;
+ Thread.sleep(10);
+ } else {
+ consecutiveEmptyReturns = 0;
+ Thread.sleep(5);
+ }
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ schedulersDone.countDown();
+ }
+ });
+ }
+ startLatch.countDown();
+ assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should
complete within 30 seconds");
+ assertTrue(schedulersDone.await(30, TimeUnit.SECONDS), "Schedulers
should complete within 30 seconds");
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ throw new AssertionError("Concurrent getScheduledMessages test
failed", exception);
+ }
+ }
+ assertEquals(errors.get(), 0, "No exceptions should occur during
concurrent operations");
+
+ // Verify that most or all messages were retrieved
+ assertEquals(totalMessagesRetrieved.get(), 500, "All messages should
be retrieved");
+
+ log.info("Total messages retrieved: {} out of {}",
totalMessagesRetrieved.get(), totalMessages);
+ }
+
+ /**
+ * Test concurrent clear() operations with read operations.
+ * This verifies that clear() properly coordinates with ongoing read
operations.
+ * Messages are added beforehand, then clear() is tested with concurrent
reads.
+ */
+ @Test
+ public void testConcurrentClearWithReads() throws Exception {
+ final int initialMessages = 1000;
+ final int numReadThreads = 10;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+ final AtomicBoolean clearCompleted = new AtomicBoolean(false);
+ // Add initial messages (single thread)
+ for (int i = 0; i < initialMessages; i++) {
+ tracker.addMessage(i, i, System.currentTimeMillis() + 60000);
+ }
+ // Start read threads that will run during clear operation
+ for (int i = 0; i < numReadThreads; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ while (!clearCompleted.get()) {
+ switch (threadId % 3) {
+ case 0:
+ tracker.containsMessage(threadId, threadId);
+ break;
+ case 1:
+ try {
+ tracker.nextDeliveryTime();
+ } catch (IllegalArgumentException e) {
+ // Expected when no messages available
+ }
+ break;
+ case 2:
+ tracker.getNumberOfDelayedMessages();
+ break;
+ }
+ Thread.sleep(1);
+ }
+ // Continue reading for a bit after clear
+ for (int j = 0; j < 100; j++) {
+ tracker.containsMessage(j, j);
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ readersDone.countDown();
+ }
+ });
+ }
+ // Start clear operation after a short delay
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ Thread.sleep(100);
+ tracker.clear().get(30, TimeUnit.SECONDS);
+ clearCompleted.set(true);
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ clearCompleted.set(true);
+ }
+ });
+ startLatch.countDown();
+ assertTrue(readersDone.await(60, TimeUnit.SECONDS), "Readers should
complete within 60 seconds");
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ throw new AssertionError("Concurrent clear test failed",
exception);
+ }
+ }
+ assertEquals(errors.get(), 0, "No exceptions should occur during
concurrent clear operations");
+ assertEquals(tracker.getNumberOfDelayedMessages(), 0, "All messages
should be cleared");
+ }
+
+ /**
+ * Test concurrent close() operations.
+ * This verifies that close() properly handles concurrent access and shuts
down cleanly.
+ * Messages are added beforehand to test close() behavior with existing
data.
+ */
+ @Test
+ public void testConcurrentClose() throws Exception {
+ final int numReadThreads = 8;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+ final AtomicBoolean closeInitiated = new AtomicBoolean(false);
+ // Add some messages first (single thread)
+ for (int i = 0; i < 100; i++) {
+ tracker.addMessage(i, i, System.currentTimeMillis() + 60000);
+ }
+ // Start read threads that will be interrupted by close
+ for (int i = 0; i < numReadThreads; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ while (!closeInitiated.get()) {
+ try {
+ switch (threadId % 4) {
+ case 0:
+ tracker.containsMessage(threadId,
threadId);
+ break;
+ case 1:
+ tracker.nextDeliveryTime();
+ break;
+ case 2:
+ tracker.getNumberOfDelayedMessages();
+ break;
+ case 3:
+ tracker.getScheduledMessages(10);
+ break;
+ }
+ } catch (IllegalArgumentException e) {
+ // Expected for some operations when tracker is
being closed
+ }
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ // Some exceptions may be expected during close
+ if (!closeInitiated.get()) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ }
+ } finally {
+ readersDone.countDown();
+ }
+ });
+ }
+ // Start close operation after a short delay
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ Thread.sleep(100);
+ closeInitiated.set(true);
+ tracker.close();
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ }
+ });
+ startLatch.countDown();
+ assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should
complete within 30 seconds");
+ if (errors.get() > 0) {
+ Exception exception = firstException.get();
+ if (exception != null) {
+ log.warn("Exception during concurrent close test (may be
expected): " + exception.getMessage());
+ }
+ }
+ // Create a new tracker for the next test since this one is closed
+ tracker = new BucketDelayedDeliveryTracker(
+ dispatcher, timer, 1000, Clock.systemUTC(), true, storage,
+ 100, 1000, 100, 10
+ );
+ }
+
+ /**
+ * Test mixed read operations with sequential addMessage and concurrent
getScheduledMessages.
+ * This tests the ReentrantReadWriteLock behavior when read and write
operations are mixed.
+ * addMessage is executed in single thread, while reads and
getScheduledMessages are concurrent.
+ * Ensures all deliverable messages are retrieved before test completion.
+ */
+ @Test
+ public void testMixedReadWriteOperationsDeadlockDetection() throws
Exception {
+ final int numReadThreads = 16;
+ final int numScheduleThreads = 4;
+ final int totalMessages = 2000;
+ final int readsPerThread = 500;
+
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+ final CountDownLatch schedulersDone = new
CountDownLatch(numScheduleThreads);
+ final CountDownLatch writerDone = new CountDownLatch(1);
+ final AtomicBoolean deadlockDetected = new AtomicBoolean(false);
+ final AtomicInteger completedOperations = new AtomicInteger(0);
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+ final AtomicInteger messagesAdded = new AtomicInteger(0);
+ final AtomicInteger deliverableMessagesCount = new AtomicInteger(0);
+ final AtomicInteger totalMessagesRetrieved = new AtomicInteger(0);
+ // Single writer thread for addMessage (sequential execution)
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ final long baseTime = System.currentTimeMillis();
+
+ for (int i = 0; i < totalMessages; i++) {
+ try {
+ long ledgerId = 10000 + i;
+ long entryId = i % 1000;
+
+ // Create mix of messages: some ready for delivery,
some delayed
+ long deliverAt;
+ if (i % 3 == 0) {
+ // Messages that will be ready for delivery after
a short delay
+ deliverAt = baseTime + 500;
+ deliverableMessagesCount.incrementAndGet();
+ } else {
+ // Messages for future delivery (much later)
+ deliverAt = baseTime + 30000;
+ }
+
+ boolean added = tracker.addMessage(ledgerId, entryId,
deliverAt);
+ if (added) {
+ messagesAdded.incrementAndGet();
+ }
+ completedOperations.incrementAndGet();
+
+ if (i % 200 == 0) {
+ Thread.sleep(1);
+ }
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ deadlockDetected.set(true);
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ writerDone.countDown();
+ }
+ });
+ // Start read threads (using read locks)
+ for (int i = 0; i < numReadThreads; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ // Continue reading until writer is done, plus some extra
operations
+ int operationCount = 0;
+ while ((writerDone.getCount() > 0 || operationCount <
readsPerThread)) {
+ try {
+ switch (threadId % 3) {
+ case 0:
+ long ledgerId = 10000 + (operationCount %
(totalMessages + 100));
+ long entryId = operationCount % 1000;
+ tracker.containsMessage(ledgerId, entryId);
+ break;
+ case 1:
+ tracker.nextDeliveryTime();
+ break;
+ case 2:
+ tracker.getNumberOfDelayedMessages();
+ break;
+ }
+ completedOperations.incrementAndGet();
+ operationCount++;
+ if (operationCount % 100 == 0) {
+ Thread.sleep(1);
+ }
+ } catch (IllegalArgumentException e) {
+ // Expected for some operations
+ completedOperations.incrementAndGet();
+ operationCount++;
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ break;
+ }
+ if (writerDone.getCount() == 0 && operationCount >=
readsPerThread) {
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ deadlockDetected.set(true);
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ readersDone.countDown();
+ }
+ });
+ }
+ // Start getScheduledMessages threads (using write locks)
+ for (int i = 0; i < numScheduleThreads; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+
+ // Wait for writer to finish and messages to become
deliverable
+ writerDone.await();
+ Thread.sleep(1000); // Wait 1 second for messages to
become ready for delivery
+
+ int consecutiveEmptyReturns = 0;
+ final int maxConsecutiveEmpty = 5;
+
+ // Continue until we've retrieved all deliverable messages
or hit max empty returns
+ while (totalMessagesRetrieved.get() <
deliverableMessagesCount.get()
+ && consecutiveEmptyReturns < maxConsecutiveEmpty) {
+ try {
+ NavigableSet<Position> messages =
tracker.getScheduledMessages(50);
+ int retrieved = messages.size();
+ totalMessagesRetrieved.addAndGet(retrieved);
+ completedOperations.incrementAndGet();
+
+ if (retrieved == 0) {
+ consecutiveEmptyReturns++;
+ Thread.sleep(5); // Short wait for more
messages
+ } else {
+ consecutiveEmptyReturns = 0;
+ Thread.sleep(2); // Short processing delay
+ }
+
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ deadlockDetected.set(true);
+ } catch (Exception e) {
+ deadlockDetected.set(true);
+ firstException.compareAndSet(null, e);
+ e.printStackTrace();
+ } finally {
+ schedulersDone.countDown();
+ }
+ });
+ }
+ // Start all threads
+ startLatch.countDown();
+
+ // Wait for completion with reasonable timeout to detect deadlocks
+ boolean writerCompleted = writerDone.await(10, TimeUnit.SECONDS);
+ boolean readersCompleted = readersDone.await(15, TimeUnit.SECONDS);
+ boolean schedulersCompleted = schedulersDone.await(20,
TimeUnit.SECONDS);
+ if (!writerCompleted || !readersCompleted || !schedulersCompleted) {
+ deadlockDetected.set(true);
+ log.error("Test timed out - potential deadlock detected. Writer:
{}, Readers: {}, Schedulers: {}",
+ writerCompleted, readersCompleted, schedulersCompleted);
+ }
+ if (deadlockDetected.get()) {
+ Exception e = firstException.get();
+ if (e != null) {
+ throw new AssertionError("Deadlock or exception detected
during mixed operations test", e);
+ } else {
+ throw new AssertionError("Deadlock detected - test did not
complete within timeout");
+ }
+ }
+
+ // Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
+ // assertEquals(deliverableMessagesCount.get(),
totalMessages));
Review Comment:
[nitpick] Commented-out code at lines 1006-1007 should be removed if not
needed. If this code serves as documentation or is planned for future use, add
a comment explaining why it's retained.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]