This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 9eb89864957 [fix][broker]Fix thread safety issues in 
BucketDelayedDeliveryTracker with StampedLock optimistic reads (#24542)
9eb89864957 is described below

commit 9eb89864957b3ca38f8a6cde2125662ce4f6ac7e
Author: Apurva007 <apurvatelan...@gmail.com>
AuthorDate: Mon Jul 21 23:17:01 2025 -0700

    [fix][broker]Fix thread safety issues in BucketDelayedDeliveryTracker with 
StampedLock optimistic reads (#24542)
---
 ...ucketDelayedDeliveryTrackerSimpleBenchmark.java | 408 +++++++++++++++++++
 .../pulsar/broker/delayed/bucket/package-info.java |  27 ++
 .../bucket/BucketDelayedDeliveryTracker.java       |  96 +++--
 ...cketDelayedDeliveryTrackerThreadSafetyTest.java | 433 +++++++++++++++++++++
 4 files changed, 942 insertions(+), 22 deletions(-)

diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java
 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java
new file mode 100644
index 00000000000..985e714d54d
--- /dev/null
+++ 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.StampedLock;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Simplified JMH Benchmarks for BucketDelayedDeliveryTracker thread safety 
improvements.
+ * This benchmark focuses on the core StampedLock optimistic read performance 
without
+ * complex dependencies on the full BucketDelayedDeliveryTracker 
implementation.
+ * Run with: mvn exec:java -Dexec.mainClass="org.openjdk.jmh.Main"
+ *           -Dexec.args="BucketDelayedDeliveryTrackerSimpleBenchmark"
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@State(Scope.Benchmark)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 5, time = 1)
+@Fork(1)
+public class BucketDelayedDeliveryTrackerSimpleBenchmark {
+
+    @Param({"1", "2", "4", "8", "16"})
+    public int threadCount;
+
+    private StampedLock stampedLock;
+    private boolean testData = true;
+    private volatile long counter = 0;
+
+    @Setup(Level.Trial)
+    public void setup() throws Exception {
+        stampedLock = new StampedLock();
+    }
+
+    @TearDown(Level.Trial)
+    public void tearDown() throws Exception {
+        // Cleanup if needed
+    }
+
+    // 
=============================================================================
+    // STAMPED LOCK OPTIMISTIC READ BENCHMARKS
+    // 
=============================================================================
+
+    @Benchmark
+    @Threads(1)
+    public boolean benchmarkOptimisticReadSingleThreaded() {
+        // Simulate optimistic read like in containsMessage()
+        long stamp = stampedLock.tryOptimisticRead();
+        boolean result = testData; // Simulate reading shared data
+
+        if (!stampedLock.validate(stamp)) {
+            stamp = stampedLock.readLock();
+            try {
+                result = testData;
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return result;
+    }
+
+    @Benchmark
+    @Threads(2)
+    public boolean benchmarkOptimisticReadMultiThreaded() {
+        long stamp = stampedLock.tryOptimisticRead();
+        boolean result = testData;
+
+        if (!stampedLock.validate(stamp)) {
+            stamp = stampedLock.readLock();
+            try {
+                result = testData;
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return result;
+    }
+
+    @Benchmark
+    @Threads(8)
+    public boolean benchmarkOptimisticReadHighConcurrency() {
+        long stamp = stampedLock.tryOptimisticRead();
+        boolean result = testData;
+
+        if (!stampedLock.validate(stamp)) {
+            stamp = stampedLock.readLock();
+            try {
+                result = testData;
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return result;
+    }
+
+    @Benchmark
+    @Threads(16)
+    public boolean benchmarkOptimisticReadExtremeConcurrency() {
+        long stamp = stampedLock.tryOptimisticRead();
+        boolean result = testData;
+
+        if (!stampedLock.validate(stamp)) {
+            stamp = stampedLock.readLock();
+            try {
+                result = testData;
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return result;
+    }
+
+    // 
=============================================================================
+    // READ:WRITE RATIO BENCHMARKS (as requested)
+    // 
=============================================================================
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkReadWrite10_90() {
+        // 10:90 read:write ratio simulation
+        if (ThreadLocalRandom.current().nextInt(100) < 10) {
+            // Read operation
+            long stamp = stampedLock.tryOptimisticRead();
+            boolean result = testData;
+
+            if (!stampedLock.validate(stamp)) {
+                stamp = stampedLock.readLock();
+                try {
+                    result = testData;
+                } finally {
+                    stampedLock.unlockRead(stamp);
+                }
+            }
+            return result;
+        } else {
+            // Write operation
+            long stamp = stampedLock.writeLock();
+            try {
+                testData = !testData;
+                counter++;
+                return testData;
+            } finally {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkReadWrite20_80() {
+        // 20:80 read:write ratio
+        if (ThreadLocalRandom.current().nextInt(100) < 20) {
+            long stamp = stampedLock.tryOptimisticRead();
+            boolean result = testData;
+
+            if (!stampedLock.validate(stamp)) {
+                stamp = stampedLock.readLock();
+                try {
+                    result = testData;
+                } finally {
+                    stampedLock.unlockRead(stamp);
+                }
+            }
+            return result;
+        } else {
+            long stamp = stampedLock.writeLock();
+            try {
+                testData = !testData;
+                counter++;
+                return testData;
+            } finally {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkReadWrite40_60() {
+        // 40:60 read:write ratio
+        if (ThreadLocalRandom.current().nextInt(100) < 40) {
+            long stamp = stampedLock.tryOptimisticRead();
+            boolean result = testData;
+
+            if (!stampedLock.validate(stamp)) {
+                stamp = stampedLock.readLock();
+                try {
+                    result = testData;
+                } finally {
+                    stampedLock.unlockRead(stamp);
+                }
+            }
+            return result;
+        } else {
+            long stamp = stampedLock.writeLock();
+            try {
+                testData = !testData;
+                counter++;
+                return testData;
+            } finally {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkReadWrite50_50() {
+        // 50:50 read:write ratio
+        if (ThreadLocalRandom.current().nextInt(100) < 50) {
+            long stamp = stampedLock.tryOptimisticRead();
+            boolean result = testData;
+
+            if (!stampedLock.validate(stamp)) {
+                stamp = stampedLock.readLock();
+                try {
+                    result = testData;
+                } finally {
+                    stampedLock.unlockRead(stamp);
+                }
+            }
+            return result;
+        } else {
+            long stamp = stampedLock.writeLock();
+            try {
+                testData = !testData;
+                counter++;
+                return testData;
+            } finally {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkReadWrite60_40() {
+        // 60:40 read:write ratio
+        if (ThreadLocalRandom.current().nextInt(100) < 60) {
+            long stamp = stampedLock.tryOptimisticRead();
+            boolean result = testData;
+
+            if (!stampedLock.validate(stamp)) {
+                stamp = stampedLock.readLock();
+                try {
+                    result = testData;
+                } finally {
+                    stampedLock.unlockRead(stamp);
+                }
+            }
+            return result;
+        } else {
+            long stamp = stampedLock.writeLock();
+            try {
+                testData = !testData;
+                counter++;
+                return testData;
+            } finally {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkReadWrite80_20() {
+        // 80:20 read:write ratio
+        if (ThreadLocalRandom.current().nextInt(100) < 80) {
+            long stamp = stampedLock.tryOptimisticRead();
+            boolean result = testData;
+
+            if (!stampedLock.validate(stamp)) {
+                stamp = stampedLock.readLock();
+                try {
+                    result = testData;
+                } finally {
+                    stampedLock.unlockRead(stamp);
+                }
+            }
+            return result;
+        } else {
+            long stamp = stampedLock.writeLock();
+            try {
+                testData = !testData;
+                counter++;
+                return testData;
+            } finally {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    @Benchmark
+    @Threads(4)
+    public boolean benchmarkReadWrite90_10() {
+        // 90:10 read:write ratio - most realistic for production
+        if (ThreadLocalRandom.current().nextInt(100) < 90) {
+            long stamp = stampedLock.tryOptimisticRead();
+            boolean result = testData;
+
+            if (!stampedLock.validate(stamp)) {
+                stamp = stampedLock.readLock();
+                try {
+                    result = testData;
+                } finally {
+                    stampedLock.unlockRead(stamp);
+                }
+            }
+            return result;
+        } else {
+            long stamp = stampedLock.writeLock();
+            try {
+                testData = !testData;
+                counter++;
+                return testData;
+            } finally {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    // 
=============================================================================
+    // HIGH CONCURRENCY SCENARIOS
+    // 
=============================================================================
+
+    @Benchmark
+    @Threads(8)
+    public boolean benchmarkReadWrite90_10_HighConcurrency() {
+        // 90:10 read:write ratio with high concurrency
+        if (ThreadLocalRandom.current().nextInt(100) < 90) {
+            long stamp = stampedLock.tryOptimisticRead();
+            boolean result = testData;
+
+            if (!stampedLock.validate(stamp)) {
+                stamp = stampedLock.readLock();
+                try {
+                    result = testData;
+                } finally {
+                    stampedLock.unlockRead(stamp);
+                }
+            }
+            return result;
+        } else {
+            long stamp = stampedLock.writeLock();
+            try {
+                testData = !testData;
+                counter++;
+                return testData;
+            } finally {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    @Benchmark
+    @Threads(16)
+    public boolean benchmarkOptimisticReadContention() {
+        // High contention scenario to test optimistic read fallback behavior
+        long stamp = stampedLock.tryOptimisticRead();
+        boolean result = testData;
+
+        // Simulate some computation
+        if (ThreadLocalRandom.current().nextInt(1000) == 0) {
+            Thread.yield(); // Occasionally yield to increase contention
+        }
+
+        if (!stampedLock.validate(stamp)) {
+            stamp = stampedLock.readLock();
+            try {
+                result = testData;
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return result;
+    }
+}
\ No newline at end of file
diff --git 
a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
new file mode 100644
index 00000000000..188ce0be224
--- /dev/null
+++ 
b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Microbenchmarks for delayed message delivery bucket implementation.
+ *
+ * <p>This package contains JMH benchmarks for testing the performance
+ * characteristics of the BucketDelayedDeliveryTracker, particularly
+ * focusing on thread safety improvements with StampedLock optimistic reads.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 686daa92d13..3f0fcc51657 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -22,10 +22,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.bookkeeper.mledger.ManagedCursor.CURSOR_INTERNAL_PROPERTY_PREFIX;
 import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
-import com.google.common.collect.Table;
 import com.google.common.collect.TreeRangeMap;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
@@ -40,9 +38,12 @@ import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.StampedLock;
 import java.util.stream.Collectors;
 import javax.annotation.concurrent.ThreadSafe;
 import lombok.Getter;
@@ -67,6 +68,12 @@ import org.roaringbitmap.RoaringBitmap;
 @ThreadSafe
 public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker {
 
+    /**
+     * Record to represent a snapshot key with ledger ID and entry ID.
+     * Avoids overhead of creating String instances for keys.
+     */
+    public static record SnapshotKey(long ledgerId, long entryId) {}
+
     public static final String DELAYED_BUCKET_KEY_PREFIX = 
CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";
 
     static final CompletableFuture<Long> NULL_LONG_PROMISE = 
CompletableFuture.completedFuture(null);
@@ -85,7 +92,10 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private final int maxNumBuckets;
 
-    private volatile long numberDelayedMessages;
+    private final AtomicLong numberDelayedMessages = new AtomicLong(0);
+
+    // Thread safety locks
+    private final StampedLock stampedLock = new StampedLock();
 
     @Getter
     @VisibleForTesting
@@ -99,7 +109,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
     @VisibleForTesting
     private final RangeMap<Long, ImmutableBucket> immutableBuckets;
 
-    private final Table<Long, Long, ImmutableBucket> 
snapshotSegmentLastIndexTable;
+    private final ConcurrentHashMap<SnapshotKey, ImmutableBucket> 
snapshotSegmentLastIndexMap;
 
     private final BucketDelayedMessageIndexStats stats;
 
@@ -131,7 +141,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         this.maxNumBuckets = maxNumBuckets;
         this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
         this.immutableBuckets = TreeRangeMap.create();
-        this.snapshotSegmentLastIndexTable = HashBasedTable.create();
+        this.snapshotSegmentLastIndexMap = new ConcurrentHashMap<>();
         this.lastMutableBucket =
                 new MutableBucket(dispatcher.getName(), 
dispatcher.getCursor(), FutureUtil.Sequencer.create(),
                         bucketSnapshotStorage);
@@ -139,7 +149,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
         // Close the tracker if failed to recover.
         try {
-            this.numberDelayedMessages = recoverBucketSnapshot();
+            long recoveredMessages = recoverBucketSnapshot();
+            this.numberDelayedMessages.set(recoveredMessages);
         } catch (RecoverDelayedDeliveryTrackerException e) {
             close();
             throw e;
@@ -203,8 +214,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 toBeDeletedBucketMap.put(key, immutableBucket);
             } else {
                 DelayedIndex lastDelayedIndex = indexList.get(indexList.size() 
- 1);
-                
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
-                        lastDelayedIndex.getEntryId(), immutableBucket);
+                this.snapshotSegmentLastIndexMap.put(
+                        new SnapshotKey(lastDelayedIndex.getLedgerId(), 
lastDelayedIndex.getEntryId()),
+                        immutableBucket);
                 for (DelayedIndex index : indexList) {
                     this.sharedBucketPriorityQueue.add(index.getTimestamp(), 
index.getLedgerId(),
                             index.getEntryId());
@@ -305,7 +317,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                     immutableBucket);
 
             DelayedIndex lastDelayedIndex = 
immutableBucketDelayedIndexPair.getRight();
-            snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), 
lastDelayedIndex.getEntryId(),
+            snapshotSegmentLastIndexMap.put(
+                    new SnapshotKey(lastDelayedIndex.getLedgerId(), 
lastDelayedIndex.getEntryId()),
                     immutableBucket);
 
             immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture 
-> {
@@ -341,8 +354,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                         
immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId);
                         immutableBuckets.asMapOfRanges().remove(
                                 Range.closed(immutableBucket.startLedgerId, 
immutableBucket.endLedgerId));
-                        
snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(),
-                                lastDelayedIndex.getTimestamp());
+                        snapshotSegmentLastIndexMap.remove(
+                                new 
SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId()));
                     }
                     return INVALID_BUCKET_ID;
                 });
@@ -392,7 +405,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
         }
 
-        numberDelayedMessages++;
+        numberDelayedMessages.incrementAndGet();
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", 
dispatcher.getName(), ledgerId, entryId,
@@ -565,6 +578,23 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     @Override
     protected long nextDeliveryTime() {
+        // Use optimistic read for frequently called method
+        long stamp = stampedLock.tryOptimisticRead();
+        long result = nextDeliveryTimeUnsafe();
+
+
+        if (!stampedLock.validate(stamp)) {
+            stamp = stampedLock.readLock();
+            try {
+                result = nextDeliveryTimeUnsafe();
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return result;
+    }
+
+    private long nextDeliveryTimeUnsafe() {
         if (lastMutableBucket.isEmpty() && 
!sharedBucketPriorityQueue.isEmpty()) {
             return sharedBucketPriorityQueue.peekN1();
         } else if (sharedBucketPriorityQueue.isEmpty() && 
!lastMutableBucket.isEmpty()) {
@@ -577,7 +607,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     @Override
     public long getNumberOfDelayedMessages() {
-        return numberDelayedMessages;
+        return numberDelayedMessages.get();
     }
 
     @Override
@@ -611,7 +641,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             long ledgerId = sharedBucketPriorityQueue.peekN2();
             long entryId = sharedBucketPriorityQueue.peekN3();
 
-            ImmutableBucket bucket = 
snapshotSegmentLastIndexTable.get(ledgerId, entryId);
+            SnapshotKey snapshotKey = new SnapshotKey(ledgerId, entryId);
+
+            ImmutableBucket bucket = 
snapshotSegmentLastIndexMap.get(snapshotKey);
             if (bucket != null && 
immutableBuckets.asMapOfRanges().containsValue(bucket)) {
                 // All message of current snapshot segment are scheduled, try 
load next snapshot segment
                 if (bucket.merging) {
@@ -637,7 +669,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 CompletableFuture<Void> loadFuture = pendingLoad = 
bucket.asyncLoadNextBucketSnapshotEntry()
                         .thenAccept(indexList -> {
                     synchronized (BucketDelayedDeliveryTracker.this) {
-                        this.snapshotSegmentLastIndexTable.remove(ledgerId, 
entryId);
+                        this.snapshotSegmentLastIndexMap.remove(snapshotKey);
                         if (CollectionUtils.isEmpty(indexList)) {
                             immutableBuckets.asMapOfRanges()
                                     .remove(Range.closed(bucket.startLedgerId, 
bucket.endLedgerId));
@@ -646,8 +678,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                         }
                         DelayedIndex
                                 lastDelayedIndex = 
indexList.get(indexList.size() - 1);
-                        
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
-                                lastDelayedIndex.getEntryId(), bucket);
+                        this.snapshotSegmentLastIndexMap.put(
+                                new 
SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId()),
+                                bucket);
                         for (DelayedIndex index : indexList) {
                             
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
                                     index.getEntryId());
@@ -689,7 +722,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
             removeIndexBit(ledgerId, entryId);
 
             --n;
-            --numberDelayedMessages;
+            numberDelayedMessages.decrementAndGet();
         }
 
         updateTimer();
@@ -715,8 +748,8 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         CompletableFuture<Void> future = cleanImmutableBuckets();
         sharedBucketPriorityQueue.clear();
         lastMutableBucket.clear();
-        snapshotSegmentLastIndexTable.clear();
-        numberDelayedMessages = 0;
+        snapshotSegmentLastIndexMap.clear();
+        numberDelayedMessages.set(0);
         return future;
     }
 
@@ -740,7 +773,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
         while (iterator.hasNext()) {
             ImmutableBucket bucket = iterator.next();
             futures.add(bucket.clear(stats));
-            numberDelayedMessages -= bucket.getNumberBucketDelayedMessages();
+            
numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages());
             iterator.remove();
         }
         return FutureUtil.waitForAll(futures);
@@ -755,7 +788,25 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 .orElse(false);
     }
 
-    public synchronized boolean containsMessage(long ledgerId, long entryId) {
+    public boolean containsMessage(long ledgerId, long entryId) {
+        // Try optimistic read first for best performance
+        long stamp = stampedLock.tryOptimisticRead();
+        boolean result = containsMessageUnsafe(ledgerId, entryId);
+
+
+        if (!stampedLock.validate(stamp)) {
+            // Fall back to read lock if validation fails
+            stamp = stampedLock.readLock();
+            try {
+                result = containsMessageUnsafe(ledgerId, entryId);
+            } finally {
+                stampedLock.unlockRead(stamp);
+            }
+        }
+        return result;
+    }
+
+    private boolean containsMessageUnsafe(long ledgerId, long entryId) {
         if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
             return true;
         }
@@ -764,6 +815,7 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                 .orElse(false);
     }
 
+
     public Map<String, TopicMetricBean> genTopicMetricMap() {
         stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
         
stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + 
this.lastMutableBucket.size());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
new file mode 100644
index 00000000000..3bc96499bfd
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.Timer;
+import java.time.Clock;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Thread safety tests for BucketDelayedDeliveryTracker.
+ * These tests verify that the hybrid approach with StampedLock and concurrent 
data structures
+ * correctly handles concurrent access patterns without deadlocks, race 
conditions, or data corruption.
+ */
+public class BucketDelayedDeliveryTrackerThreadSafetyTest {
+
+    private BucketDelayedDeliveryTracker tracker;
+    private AbstractPersistentDispatcherMultipleConsumers dispatcher;
+    private ManagedCursor cursor;
+    private Timer timer;
+    private BucketSnapshotStorage storage;
+    private ExecutorService executorService;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        dispatcher = mock(AbstractPersistentDispatcherMultipleConsumers.class);
+        cursor = mock(ManagedCursor.class);
+        timer = mock(Timer.class);
+        storage = mock(BucketSnapshotStorage.class);
+
+        
when(dispatcher.getName()).thenReturn("persistent://public/default/test-topic / 
test-cursor");
+        when(dispatcher.getCursor()).thenReturn(cursor);
+        when(cursor.getName()).thenReturn("test-cursor");  // Provide a valid 
cursor name
+        
when(cursor.getCursorProperties()).thenReturn(java.util.Collections.emptyMap());
+
+        // Mock cursor property operations for bucket key storage
+        when(cursor.putCursorProperty(any(), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(cursor.removeCursorProperty(any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+
+        // Mock storage operations to avoid NullPointerException
+        when(storage.createBucketSnapshot(any(), any(), any(), any(), any()))
+                .thenReturn(CompletableFuture.completedFuture(1L));
+        when(storage.getBucketSnapshotMetadata(anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(storage.getBucketSnapshotSegment(anyLong(), anyLong(), anyLong()))
+                
.thenReturn(CompletableFuture.completedFuture(java.util.Collections.emptyList()));
+        when(storage.getBucketSnapshotLength(anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(0L));
+        when(storage.deleteBucketSnapshot(anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+
+        tracker = new BucketDelayedDeliveryTracker(
+            dispatcher, timer, 1000, Clock.systemUTC(), true, storage,
+            100, 1000, 100, 10  // Restore original minIndexCountPerBucket for 
proper testing
+        );
+
+        executorService = Executors.newFixedThreadPool(32);
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        if (tracker != null) {
+            tracker.close();
+        }
+        if (executorService != null) {
+            
assertTrue(MoreExecutors.shutdownAndAwaitTermination(executorService, 5, 
TimeUnit.SECONDS),
+                "Executor should shutdown cleanly");
+        }
+    }
+
+    /**
+     * Test concurrent containsMessage() calls while adding messages.
+     * This tests the StampedLock optimistic read performance under contention.
+     */
+    @Test
+    public void testConcurrentContainsMessageWithWrites() throws Exception {
+        final int numThreads = 16;
+        final int operationsPerThread = 1000;  // Restore to test bucket 
creation properly
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+
+        // Start reader threads
+        for (int i = 0; i < numThreads / 2; i++) {
+            final int threadId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        long ledgerId = threadId * 1000 + j;
+                        long entryId = j;
+                        // This should not throw exceptions or block 
indefinitely
+                        tracker.containsMessage(ledgerId, entryId);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        // Start writer threads
+        for (int i = numThreads / 2; i < numThreads; i++) {
+            final int threadId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        long ledgerId = threadId * 1000 + j;
+                        long entryId = j;
+                        long deliverAt = System.currentTimeMillis() + 10000; 
// 10s delay
+                        tracker.addMessage(ledgerId, entryId, deliverAt);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+        assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should 
complete within 30 seconds");
+
+        if (errors.get() > 0) {
+            Exception exception = firstException.get();
+            if (exception != null) {
+                System.err.println("First exception caught: " + 
exception.getMessage());
+                exception.printStackTrace();
+            }
+        }
+        assertEquals(errors.get(), 0, "No exceptions should occur during 
concurrent operations");
+    }
+
+    /**
+     * Test concurrent nextDeliveryTime() calls.
+     * This verifies the StampedLock implementation for read-heavy operations.
+     */
+    @Test
+    public void testConcurrentNextDeliveryTime() throws Exception {
+        // Add some messages first
+        for (int i = 0; i < 100; i++) {
+            tracker.addMessage(i, i, System.currentTimeMillis() + (i * 1000));
+        }
+
+        final int numThreads = 20;
+        final int callsPerThread = 10000;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicLong totalCalls = new AtomicLong(0);
+
+        for (int i = 0; i < numThreads; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < callsPerThread; j++) {
+                        long nextTime = tracker.nextDeliveryTime();
+                        assertTrue(nextTime > 0, "Next delivery time should be 
positive");
+                        totalCalls.incrementAndGet();
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+        assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should 
complete within 30 seconds");
+        assertEquals(errors.get(), 0, "No exceptions should occur");
+        assertEquals(totalCalls.get(), numThreads * callsPerThread, "All calls 
should complete");
+    }
+
+    /**
+     * Test for deadlock detection with mixed read/write operations.
+     * This simulates the real-world scenario where containsMessage() is called
+     * from read threads while write operations modify the tracker state.
+     */
+    @Test
+    public void testDeadlockDetection() throws Exception {
+        final int numThreads = 32;
+        final int operationsPerThread = 100;
+        // Use Phaser for better concurrency coordination
+        final Phaser startPhaser = new Phaser(numThreads + 1); // +1 for main 
thread
+        final Phaser endPhaser = new Phaser(numThreads + 1); // +1 for main 
thread
+        final AtomicBoolean deadlockDetected = new AtomicBoolean(false);
+        final AtomicInteger completedOperations = new AtomicInteger(0);
+
+        // Mixed workload: reads, writes, and metric queries
+        for (int i = 0; i < numThreads; i++) {
+            final int threadId = i;
+            final int workloadType = i % 4;
+
+            executorService.submit(() -> {
+                try {
+                    // Wait for all threads to be ready
+                    startPhaser.arriveAndAwaitAdvance();
+
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        try {
+                            switch (workloadType) {
+                                case 0: // containsMessage calls
+                                    tracker.containsMessage(threadId * 1000 + 
j, j);
+                                    break;
+                                case 1: // addMessage calls
+                                    tracker.addMessage(threadId * 1000 + j, j, 
System.currentTimeMillis() + 5000);
+                                    break;
+                                case 2: // nextDeliveryTime calls
+                                    tracker.nextDeliveryTime();
+                                    break;
+                                case 3: // getNumberOfDelayedMessages calls
+                                    tracker.getNumberOfDelayedMessages();
+                                    break;
+                            }
+                            completedOperations.incrementAndGet();
+                        } catch (IllegalArgumentException e) {
+                            // IllegalArgumentException is expected for some 
operations
+                            // (e.g., calling nextDeliveryTime on empty queue, 
invalid ledger IDs)
+                            // This is not a deadlock, just normal validation
+                            completedOperations.incrementAndGet();
+                        }
+                    }
+                } catch (Exception e) {
+                    // Only unexpected exceptions indicate potential deadlocks
+                    if (!(e instanceof IllegalArgumentException)) {
+                        deadlockDetected.set(true);
+                        e.printStackTrace();
+                    }
+                } finally {
+                    // Signal completion
+                    endPhaser.arriveAndDeregister();
+                }
+            });
+        }
+
+        // Start all threads at once
+        startPhaser.arriveAndAwaitAdvance();
+
+        // Wait for all threads to complete with timeout to detect potential 
deadlocks
+        try {
+            endPhaser.awaitAdvanceInterruptibly(endPhaser.arrive(), 60, 
TimeUnit.SECONDS);
+        } catch (Exception e) {
+            // Timeout or interrupt indicates potential deadlock
+            deadlockDetected.set(true);
+            e.printStackTrace();
+        }
+
+        assertTrue(!deadlockDetected.get(), "No deadlocks should be detected");
+        assertTrue(completedOperations.get() > 0, "Some operations should 
complete");
+    }
+
+    /**
+     * Test data consistency under high concurrency.
+     * Verifies that the hybrid approach maintains data integrity.
+     */
+    @Test
+    public void testDataConsistencyUnderConcurrency() throws Exception {
+        final int numWriteThreads = 8;
+        final int numReadThreads = 16;
+        final int messagesPerWriter = 500;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch writersDone = new CountDownLatch(numWriteThreads);
+        final CountDownLatch readersDone = new CountDownLatch(numReadThreads);
+        final AtomicInteger foundMessages = new AtomicInteger(0);
+        final AtomicInteger totalMessagesAdded = new AtomicInteger(0);
+
+        // Writer threads add messages
+        for (int i = 0; i < numWriteThreads; i++) {
+            final int writerId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < messagesPerWriter; j++) {
+                        long ledgerId = writerId * 10000 + j;
+                        long entryId = j;
+                        boolean added = tracker.addMessage(ledgerId, entryId, 
System.currentTimeMillis() + 30000);
+                        if (added) {
+                            totalMessagesAdded.incrementAndGet();
+                        }
+                    }
+                } catch (Exception e) {
+                    // Ignore exceptions for this test
+                } finally {
+                    writersDone.countDown();
+                }
+            });
+        }
+
+        // Reader threads check for messages
+        for (int i = 0; i < numReadThreads; i++) {
+            final int readerId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+
+                    // Read for a while to catch messages being added
+                    long endTime = System.currentTimeMillis() + 5000; // Read 
for 5 seconds
+                    while (System.currentTimeMillis() < endTime) {
+                        for (int writerId = 0; writerId < numWriteThreads; 
writerId++) {
+                            for (int j = 0; j < messagesPerWriter; j++) {
+                                long ledgerId = writerId * 10000 + j;
+                                long entryId = j;
+                                if (tracker.containsMessage(ledgerId, 
entryId)) {
+                                    foundMessages.incrementAndGet();
+                                }
+                            }
+                        }
+                        Thread.sleep(10); // Small delay to allow writes
+                    }
+                } catch (Exception e) {
+                    // Ignore exceptions for this test
+                } finally {
+                    readersDone.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+
+        assertTrue(writersDone.await(30, TimeUnit.SECONDS), "Writers should 
complete");
+        assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should 
complete");
+
+        // Verify final consistency
+        long finalMessageCount = tracker.getNumberOfDelayedMessages();
+        assertTrue(finalMessageCount >= 0, "Message count should be 
non-negative");
+
+        // The exact counts may vary due to timing, but we should have some 
successful operations
+        assertTrue(totalMessagesAdded.get() > 0, "Some messages should have 
been added");
+    }
+
+    /**
+     * Test optimistic read performance under varying contention levels.
+     * This helps validate that the StampedLock optimistic reads are working 
efficiently.
+     */
+    @Test
+    public void testOptimisticReadPerformance() throws Exception {
+        // Add baseline messages
+        for (int i = 0; i < 1000; i++) {
+            tracker.addMessage(i, i, System.currentTimeMillis() + 60000);
+        }
+
+        final int[] threadCounts = {1, 2, 4, 8, 16};
+
+        for (int numThreads : threadCounts) {
+            final int readsPerThread = 10000;
+            final CountDownLatch startLatch = new CountDownLatch(1);
+            final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+            final AtomicLong totalReads = new AtomicLong(0);
+
+            long startTime = System.nanoTime();
+
+            for (int i = 0; i < numThreads; i++) {
+                final int threadId = i;
+                executorService.submit(() -> {
+                    try {
+                        startLatch.await();
+                        for (int j = 0; j < readsPerThread; j++) {
+                            // Mix of existing and non-existing messages
+                            long ledgerId = (threadId * readsPerThread + j) % 
2000;
+                            long entryId = j % 1000;
+                            tracker.containsMessage(ledgerId, entryId);
+                            totalReads.incrementAndGet();
+                        }
+                    } catch (Exception e) {
+                        // Ignore for performance test
+                    } finally {
+                        doneLatch.countDown();
+                    }
+                });
+            }
+
+            startLatch.countDown();
+            assertTrue(doneLatch.await(30, TimeUnit.SECONDS),
+                "Performance test with " + numThreads + " threads should 
complete");
+
+            long endTime = System.nanoTime();
+            long duration = endTime - startTime;
+            double throughput = (totalReads.get() * 1_000_000_000.0) / 
duration;
+
+            System.out.printf("Threads: %d, Reads: %d, Throughput: %.0f 
ops/sec%n",
+                numThreads, totalReads.get(), throughput);
+
+            // Basic sanity check - should achieve reasonable throughput
+            assertTrue(throughput > 10000, "Should achieve at least 10K 
ops/sec with " + numThreads + " threads");
+        }
+    }
+}
\ No newline at end of file

Reply via email to