This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new dd9a3a7bc4e [fix][broker] Fix concurrency bug in 
BucketDelayedDeliveryTracker (#25346)
dd9a3a7bc4e is described below

commit dd9a3a7bc4e9bd2aee902a8b84b5779d33f5ce53
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 18 06:55:26 2026 -0700

    [fix][broker] Fix concurrency bug in BucketDelayedDeliveryTracker (#25346)
---
 .../bucket/BucketDelayedDeliveryTracker.java       | 42 ++--------------------
 1 file changed, 2 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 3f0fcc51657..ff077e57340 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -43,7 +43,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.StampedLock;
 import java.util.stream.Collectors;
 import javax.annotation.concurrent.ThreadSafe;
 import lombok.Getter;
@@ -94,8 +93,6 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final AtomicLong numberDelayedMessages = new AtomicLong(0);
 
-    // Thread safety locks
-    private final StampedLock stampedLock = new StampedLock();
 
     @Getter
     @VisibleForTesting
@@ -577,24 +574,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
     }
 
     @Override
-    protected long nextDeliveryTime() {
-        // Use optimistic read for frequently called method
-        long stamp = stampedLock.tryOptimisticRead();
-        long result = nextDeliveryTimeUnsafe();
-
-
-        if (!stampedLock.validate(stamp)) {
-            stamp = stampedLock.readLock();
-            try {
-                result = nextDeliveryTimeUnsafe();
-            } finally {
-                stampedLock.unlockRead(stamp);
-            }
-        }
-        return result;
-    }
-
-    private long nextDeliveryTimeUnsafe() {
+    protected synchronized long nextDeliveryTime() {
         if (lastMutableBucket.isEmpty() && 
!sharedBucketPriorityQueue.isEmpty()) {
             return sharedBucketPriorityQueue.peekN1();
         } else if (sharedBucketPriorityQueue.isEmpty() && 
!lastMutableBucket.isEmpty()) {
@@ -788,25 +768,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 .orElse(false);
     }
 
-    public boolean containsMessage(long ledgerId, long entryId) {
-        // Try optimistic read first for best performance
-        long stamp = stampedLock.tryOptimisticRead();
-        boolean result = containsMessageUnsafe(ledgerId, entryId);
-
-
-        if (!stampedLock.validate(stamp)) {
-            // Fall back to read lock if validation fails
-            stamp = stampedLock.readLock();
-            try {
-                result = containsMessageUnsafe(ledgerId, entryId);
-            } finally {
-                stampedLock.unlockRead(stamp);
-            }
-        }
-        return result;
-    }
-
-    private boolean containsMessageUnsafe(long ledgerId, long entryId) {
+    public synchronized boolean containsMessage(long ledgerId, long entryId) {
         if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
             return true;
         }

Reply via email to