This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 9eb89864957 [fix][broker]Fix thread safety issues in BucketDelayedDeliveryTracker with StampedLock optimistic reads (#24542) 9eb89864957 is described below commit 9eb89864957b3ca38f8a6cde2125662ce4f6ac7e Author: Apurva007 <apurvatelan...@gmail.com> AuthorDate: Mon Jul 21 23:17:01 2025 -0700 [fix][broker]Fix thread safety issues in BucketDelayedDeliveryTracker with StampedLock optimistic reads (#24542) --- ...ucketDelayedDeliveryTrackerSimpleBenchmark.java | 408 +++++++++++++++++++ .../pulsar/broker/delayed/bucket/package-info.java | 27 ++ .../bucket/BucketDelayedDeliveryTracker.java | 96 +++-- ...cketDelayedDeliveryTrackerThreadSafetyTest.java | 433 +++++++++++++++++++++ 4 files changed, 942 insertions(+), 22 deletions(-) diff --git a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java new file mode 100644 index 00000000000..985e714d54d --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.StampedLock; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Simplified JMH Benchmarks for BucketDelayedDeliveryTracker thread safety improvements. + * This benchmark focuses on the core StampedLock optimistic read performance without + * complex dependencies on the full BucketDelayedDeliveryTracker implementation. + * Run with: mvn exec:java -Dexec.mainClass="org.openjdk.jmh.Main" + * -Dexec.args="BucketDelayedDeliveryTrackerSimpleBenchmark" + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Benchmark) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) +@Fork(1) +public class BucketDelayedDeliveryTrackerSimpleBenchmark { + + @Param({"1", "2", "4", "8", "16"}) + public int threadCount; + + private StampedLock stampedLock; + private boolean testData = true; + private volatile long counter = 0; + + @Setup(Level.Trial) + public void setup() throws Exception { + stampedLock = new StampedLock(); + } + + @TearDown(Level.Trial) + public void tearDown() throws Exception { + // Cleanup if needed + } + + // ============================================================================= + // STAMPED LOCK OPTIMISTIC READ BENCHMARKS + // ============================================================================= + + @Benchmark + @Threads(1) + public boolean benchmarkOptimisticReadSingleThreaded() { + // Simulate optimistic read like in containsMessage() + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; // Simulate reading shared data + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } + + @Benchmark + @Threads(2) + public boolean benchmarkOptimisticReadMultiThreaded() { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } + + @Benchmark + @Threads(8) + public boolean benchmarkOptimisticReadHighConcurrency() { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } + + @Benchmark + @Threads(16) + public boolean benchmarkOptimisticReadExtremeConcurrency() { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } + + // ============================================================================= + // READ:WRITE RATIO BENCHMARKS (as requested) + // ============================================================================= + + @Benchmark + @Threads(4) + public boolean benchmarkReadWrite10_90() { + // 10:90 read:write ratio simulation + if (ThreadLocalRandom.current().nextInt(100) < 10) { + // Read operation + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } else { + // Write operation + long stamp = stampedLock.writeLock(); + try { + testData = !testData; + counter++; + return testData; + } finally { + stampedLock.unlockWrite(stamp); + } + } + } + + @Benchmark + @Threads(4) + public boolean benchmarkReadWrite20_80() { + // 20:80 read:write ratio + if (ThreadLocalRandom.current().nextInt(100) < 20) { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } else { + long stamp = stampedLock.writeLock(); + try { + testData = !testData; + counter++; + return testData; + } finally { + stampedLock.unlockWrite(stamp); + } + } + } + + @Benchmark + @Threads(4) + public boolean benchmarkReadWrite40_60() { + // 40:60 read:write ratio + if (ThreadLocalRandom.current().nextInt(100) < 40) { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } else { + long stamp = stampedLock.writeLock(); + try { + testData = !testData; + counter++; + return testData; + } finally { + stampedLock.unlockWrite(stamp); + } + } + } + + @Benchmark + @Threads(4) + public boolean benchmarkReadWrite50_50() { + // 50:50 read:write ratio + if (ThreadLocalRandom.current().nextInt(100) < 50) { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } else { + long stamp = stampedLock.writeLock(); + try { + testData = !testData; + counter++; + return testData; + } finally { + stampedLock.unlockWrite(stamp); + } + } + } + + @Benchmark + @Threads(4) + public boolean benchmarkReadWrite60_40() { + // 60:40 read:write ratio + if (ThreadLocalRandom.current().nextInt(100) < 60) { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } else { + long stamp = stampedLock.writeLock(); + try { + testData = !testData; + counter++; + return testData; + } finally { + stampedLock.unlockWrite(stamp); + } + } + } + + @Benchmark + @Threads(4) + public boolean benchmarkReadWrite80_20() { + // 80:20 read:write ratio + if (ThreadLocalRandom.current().nextInt(100) < 80) { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } else { + long stamp = stampedLock.writeLock(); + try { + testData = !testData; + counter++; + return testData; + } finally { + stampedLock.unlockWrite(stamp); + } + } + } + + @Benchmark + @Threads(4) + public boolean benchmarkReadWrite90_10() { + // 90:10 read:write ratio - most realistic for production + if (ThreadLocalRandom.current().nextInt(100) < 90) { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } else { + long stamp = stampedLock.writeLock(); + try { + testData = !testData; + counter++; + return testData; + } finally { + stampedLock.unlockWrite(stamp); + } + } + } + + // ============================================================================= + // HIGH CONCURRENCY SCENARIOS + // ============================================================================= + + @Benchmark + @Threads(8) + public boolean benchmarkReadWrite90_10_HighConcurrency() { + // 90:10 read:write ratio with high concurrency + if (ThreadLocalRandom.current().nextInt(100) < 90) { + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } else { + long stamp = stampedLock.writeLock(); + try { + testData = !testData; + counter++; + return testData; + } finally { + stampedLock.unlockWrite(stamp); + } + } + } + + @Benchmark + @Threads(16) + public boolean benchmarkOptimisticReadContention() { + // High contention scenario to test optimistic read fallback behavior + long stamp = stampedLock.tryOptimisticRead(); + boolean result = testData; + + // Simulate some computation + if (ThreadLocalRandom.current().nextInt(1000) == 0) { + Thread.yield(); // Occasionally yield to increase contention + } + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = testData; + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } +} \ No newline at end of file diff --git a/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java new file mode 100644 index 00000000000..188ce0be224 --- /dev/null +++ b/microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Microbenchmarks for delayed message delivery bucket implementation. + * + * <p>This package contains JMH benchmarks for testing the performance + * characteristics of the BucketDelayedDeliveryTracker, particularly + * focusing on thread safety improvements with StampedLock optimistic reads. + */ +package org.apache.pulsar.broker.delayed.bucket; \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 686daa92d13..3f0fcc51657 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -22,10 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.ManagedCursor.CURSOR_INTERNAL_PROPERTY_PREFIX; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.HashBasedTable; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; -import com.google.common.collect.Table; import com.google.common.collect.TreeRangeMap; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -40,9 +38,12 @@ import java.util.NavigableSet; import java.util.Optional; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.StampedLock; import java.util.stream.Collectors; import javax.annotation.concurrent.ThreadSafe; import lombok.Getter; @@ -67,6 +68,12 @@ import org.roaringbitmap.RoaringBitmap; @ThreadSafe public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { + /** + * Record to represent a snapshot key with ledger ID and entry ID. + * Avoids overhead of creating String instances for keys. + */ + public static record SnapshotKey(long ledgerId, long entryId) {} + public static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket"; static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null); @@ -85,7 +92,10 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private final int maxNumBuckets; - private volatile long numberDelayedMessages; + private final AtomicLong numberDelayedMessages = new AtomicLong(0); + + // Thread safety locks + private final StampedLock stampedLock = new StampedLock(); @Getter @VisibleForTesting @@ -99,7 +109,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker @VisibleForTesting private final RangeMap<Long, ImmutableBucket> immutableBuckets; - private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable; + private final ConcurrentHashMap<SnapshotKey, ImmutableBucket> snapshotSegmentLastIndexMap; private final BucketDelayedMessageIndexStats stats; @@ -131,7 +141,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker this.maxNumBuckets = maxNumBuckets; this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); this.immutableBuckets = TreeRangeMap.create(); - this.snapshotSegmentLastIndexTable = HashBasedTable.create(); + this.snapshotSegmentLastIndexMap = new ConcurrentHashMap<>(); this.lastMutableBucket = new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(), bucketSnapshotStorage); @@ -139,7 +149,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker // Close the tracker if failed to recover. try { - this.numberDelayedMessages = recoverBucketSnapshot(); + long recoveredMessages = recoverBucketSnapshot(); + this.numberDelayedMessages.set(recoveredMessages); } catch (RecoverDelayedDeliveryTrackerException e) { close(); throw e; @@ -203,8 +214,9 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker toBeDeletedBucketMap.put(key, immutableBucket); } else { DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1); - this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), - lastDelayedIndex.getEntryId(), immutableBucket); + this.snapshotSegmentLastIndexMap.put( + new SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId()), + immutableBucket); for (DelayedIndex index : indexList) { this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), index.getEntryId()); @@ -305,7 +317,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker immutableBucket); DelayedIndex lastDelayedIndex = immutableBucketDelayedIndexPair.getRight(); - snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(), + snapshotSegmentLastIndexMap.put( + new SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId()), immutableBucket); immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> { @@ -341,8 +354,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId); immutableBuckets.asMapOfRanges().remove( Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId)); - snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(), - lastDelayedIndex.getTimestamp()); + snapshotSegmentLastIndexMap.remove( + new SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId())); } return INVALID_BUCKET_ID; }); @@ -392,7 +405,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker lastMutableBucket.addMessage(ledgerId, entryId, deliverAt); } - numberDelayedMessages++; + numberDelayedMessages.incrementAndGet(); if (log.isDebugEnabled()) { log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId, @@ -565,6 +578,23 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker @Override protected long nextDeliveryTime() { + // Use optimistic read for frequently called method + long stamp = stampedLock.tryOptimisticRead(); + long result = nextDeliveryTimeUnsafe(); + + + if (!stampedLock.validate(stamp)) { + stamp = stampedLock.readLock(); + try { + result = nextDeliveryTimeUnsafe(); + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } + + private long nextDeliveryTimeUnsafe() { if (lastMutableBucket.isEmpty() && !sharedBucketPriorityQueue.isEmpty()) { return sharedBucketPriorityQueue.peekN1(); } else if (sharedBucketPriorityQueue.isEmpty() && !lastMutableBucket.isEmpty()) { @@ -577,7 +607,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker @Override public long getNumberOfDelayedMessages() { - return numberDelayedMessages; + return numberDelayedMessages.get(); } @Override @@ -611,7 +641,9 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker long ledgerId = sharedBucketPriorityQueue.peekN2(); long entryId = sharedBucketPriorityQueue.peekN3(); - ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); + SnapshotKey snapshotKey = new SnapshotKey(ledgerId, entryId); + + ImmutableBucket bucket = snapshotSegmentLastIndexMap.get(snapshotKey); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { // All message of current snapshot segment are scheduled, try load next snapshot segment if (bucket.merging) { @@ -637,7 +669,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() .thenAccept(indexList -> { synchronized (BucketDelayedDeliveryTracker.this) { - this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); + this.snapshotSegmentLastIndexMap.remove(snapshotKey); if (CollectionUtils.isEmpty(indexList)) { immutableBuckets.asMapOfRanges() .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); @@ -646,8 +678,9 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker } DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1); - this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), - lastDelayedIndex.getEntryId(), bucket); + this.snapshotSegmentLastIndexMap.put( + new SnapshotKey(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId()), + bucket); for (DelayedIndex index : indexList) { sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), index.getEntryId()); @@ -689,7 +722,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker removeIndexBit(ledgerId, entryId); --n; - --numberDelayedMessages; + numberDelayedMessages.decrementAndGet(); } updateTimer(); @@ -715,8 +748,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker CompletableFuture<Void> future = cleanImmutableBuckets(); sharedBucketPriorityQueue.clear(); lastMutableBucket.clear(); - snapshotSegmentLastIndexTable.clear(); - numberDelayedMessages = 0; + snapshotSegmentLastIndexMap.clear(); + numberDelayedMessages.set(0); return future; } @@ -740,7 +773,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker while (iterator.hasNext()) { ImmutableBucket bucket = iterator.next(); futures.add(bucket.clear(stats)); - numberDelayedMessages -= bucket.getNumberBucketDelayedMessages(); + numberDelayedMessages.addAndGet(-bucket.getNumberBucketDelayedMessages()); iterator.remove(); } return FutureUtil.waitForAll(futures); @@ -755,7 +788,25 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker .orElse(false); } - public synchronized boolean containsMessage(long ledgerId, long entryId) { + public boolean containsMessage(long ledgerId, long entryId) { + // Try optimistic read first for best performance + long stamp = stampedLock.tryOptimisticRead(); + boolean result = containsMessageUnsafe(ledgerId, entryId); + + + if (!stampedLock.validate(stamp)) { + // Fall back to read lock if validation fails + stamp = stampedLock.readLock(); + try { + result = containsMessageUnsafe(ledgerId, entryId); + } finally { + stampedLock.unlockRead(stamp); + } + } + return result; + } + + private boolean containsMessageUnsafe(long ledgerId, long entryId) { if (lastMutableBucket.containsMessage(ledgerId, entryId)) { return true; } @@ -764,6 +815,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker .orElse(false); } + public Map<String, TopicMetricBean> genTopicMetricMap() { stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1); stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java new file mode 100644 index 00000000000..3bc96499bfd --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.util.Timer; +import java.time.Clock; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Thread safety tests for BucketDelayedDeliveryTracker. + * These tests verify that the hybrid approach with StampedLock and concurrent data structures + * correctly handles concurrent access patterns without deadlocks, race conditions, or data corruption. + */ +public class BucketDelayedDeliveryTrackerThreadSafetyTest { + + private BucketDelayedDeliveryTracker tracker; + private AbstractPersistentDispatcherMultipleConsumers dispatcher; + private ManagedCursor cursor; + private Timer timer; + private BucketSnapshotStorage storage; + private ExecutorService executorService; + + @BeforeMethod + public void setUp() throws Exception { + dispatcher = mock(AbstractPersistentDispatcherMultipleConsumers.class); + cursor = mock(ManagedCursor.class); + timer = mock(Timer.class); + storage = mock(BucketSnapshotStorage.class); + + when(dispatcher.getName()).thenReturn("persistent://public/default/test-topic / test-cursor"); + when(dispatcher.getCursor()).thenReturn(cursor); + when(cursor.getName()).thenReturn("test-cursor"); // Provide a valid cursor name + when(cursor.getCursorProperties()).thenReturn(java.util.Collections.emptyMap()); + + // Mock cursor property operations for bucket key storage + when(cursor.putCursorProperty(any(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(cursor.removeCursorProperty(any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + // Mock storage operations to avoid NullPointerException + when(storage.createBucketSnapshot(any(), any(), any(), any(), any())) + .thenReturn(CompletableFuture.completedFuture(1L)); + when(storage.getBucketSnapshotMetadata(anyLong())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(storage.getBucketSnapshotSegment(anyLong(), anyLong(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(java.util.Collections.emptyList())); + when(storage.getBucketSnapshotLength(anyLong())) + .thenReturn(CompletableFuture.completedFuture(0L)); + when(storage.deleteBucketSnapshot(anyLong())) + .thenReturn(CompletableFuture.completedFuture(null)); + + tracker = new BucketDelayedDeliveryTracker( + dispatcher, timer, 1000, Clock.systemUTC(), true, storage, + 100, 1000, 100, 10 // Restore original minIndexCountPerBucket for proper testing + ); + + executorService = Executors.newFixedThreadPool(32); + } + + @AfterMethod + public void tearDown() throws Exception { + if (tracker != null) { + tracker.close(); + } + if (executorService != null) { + assertTrue(MoreExecutors.shutdownAndAwaitTermination(executorService, 5, TimeUnit.SECONDS), + "Executor should shutdown cleanly"); + } + } + + /** + * Test concurrent containsMessage() calls while adding messages. + * This tests the StampedLock optimistic read performance under contention. + */ + @Test + public void testConcurrentContainsMessageWithWrites() throws Exception { + final int numThreads = 16; + final int operationsPerThread = 1000; // Restore to test bucket creation properly + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + final AtomicReference<Exception> firstException = new AtomicReference<>(); + + // Start reader threads + for (int i = 0; i < numThreads / 2; i++) { + final int threadId = i; + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + long ledgerId = threadId * 1000 + j; + long entryId = j; + // This should not throw exceptions or block indefinitely + tracker.containsMessage(ledgerId, entryId); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start writer threads + for (int i = numThreads / 2; i < numThreads; i++) { + final int threadId = i; + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < operationsPerThread; j++) { + long ledgerId = threadId * 1000 + j; + long entryId = j; + long deliverAt = System.currentTimeMillis() + 10000; // 10s delay + tracker.addMessage(ledgerId, entryId, deliverAt); + } + } catch (Exception e) { + errors.incrementAndGet(); + firstException.compareAndSet(null, e); + e.printStackTrace(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + + if (errors.get() > 0) { + Exception exception = firstException.get(); + if (exception != null) { + System.err.println("First exception caught: " + exception.getMessage()); + exception.printStackTrace(); + } + } + assertEquals(errors.get(), 0, "No exceptions should occur during concurrent operations"); + } + + /** + * Test concurrent nextDeliveryTime() calls. + * This verifies the StampedLock implementation for read-heavy operations. + */ + @Test + public void testConcurrentNextDeliveryTime() throws Exception { + // Add some messages first + for (int i = 0; i < 100; i++) { + tracker.addMessage(i, i, System.currentTimeMillis() + (i * 1000)); + } + + final int numThreads = 20; + final int callsPerThread = 10000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + final AtomicLong totalCalls = new AtomicLong(0); + + for (int i = 0; i < numThreads; i++) { + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < callsPerThread; j++) { + long nextTime = tracker.nextDeliveryTime(); + assertTrue(nextTime > 0, "Next delivery time should be positive"); + totalCalls.incrementAndGet(); + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should complete within 30 seconds"); + assertEquals(errors.get(), 0, "No exceptions should occur"); + assertEquals(totalCalls.get(), numThreads * callsPerThread, "All calls should complete"); + } + + /** + * Test for deadlock detection with mixed read/write operations. + * This simulates the real-world scenario where containsMessage() is called + * from read threads while write operations modify the tracker state. + */ + @Test + public void testDeadlockDetection() throws Exception { + final int numThreads = 32; + final int operationsPerThread = 100; + // Use Phaser for better concurrency coordination + final Phaser startPhaser = new Phaser(numThreads + 1); // +1 for main thread + final Phaser endPhaser = new Phaser(numThreads + 1); // +1 for main thread + final AtomicBoolean deadlockDetected = new AtomicBoolean(false); + final AtomicInteger completedOperations = new AtomicInteger(0); + + // Mixed workload: reads, writes, and metric queries + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + final int workloadType = i % 4; + + executorService.submit(() -> { + try { + // Wait for all threads to be ready + startPhaser.arriveAndAwaitAdvance(); + + for (int j = 0; j < operationsPerThread; j++) { + try { + switch (workloadType) { + case 0: // containsMessage calls + tracker.containsMessage(threadId * 1000 + j, j); + break; + case 1: // addMessage calls + tracker.addMessage(threadId * 1000 + j, j, System.currentTimeMillis() + 5000); + break; + case 2: // nextDeliveryTime calls + tracker.nextDeliveryTime(); + break; + case 3: // getNumberOfDelayedMessages calls + tracker.getNumberOfDelayedMessages(); + break; + } + completedOperations.incrementAndGet(); + } catch (IllegalArgumentException e) { + // IllegalArgumentException is expected for some operations + // (e.g., calling nextDeliveryTime on empty queue, invalid ledger IDs) + // This is not a deadlock, just normal validation + completedOperations.incrementAndGet(); + } + } + } catch (Exception e) { + // Only unexpected exceptions indicate potential deadlocks + if (!(e instanceof IllegalArgumentException)) { + deadlockDetected.set(true); + e.printStackTrace(); + } + } finally { + // Signal completion + endPhaser.arriveAndDeregister(); + } + }); + } + + // Start all threads at once + startPhaser.arriveAndAwaitAdvance(); + + // Wait for all threads to complete with timeout to detect potential deadlocks + try { + endPhaser.awaitAdvanceInterruptibly(endPhaser.arrive(), 60, TimeUnit.SECONDS); + } catch (Exception e) { + // Timeout or interrupt indicates potential deadlock + deadlockDetected.set(true); + e.printStackTrace(); + } + + assertTrue(!deadlockDetected.get(), "No deadlocks should be detected"); + assertTrue(completedOperations.get() > 0, "Some operations should complete"); + } + + /** + * Test data consistency under high concurrency. + * Verifies that the hybrid approach maintains data integrity. + */ + @Test + public void testDataConsistencyUnderConcurrency() throws Exception { + final int numWriteThreads = 8; + final int numReadThreads = 16; + final int messagesPerWriter = 500; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch writersDone = new CountDownLatch(numWriteThreads); + final CountDownLatch readersDone = new CountDownLatch(numReadThreads); + final AtomicInteger foundMessages = new AtomicInteger(0); + final AtomicInteger totalMessagesAdded = new AtomicInteger(0); + + // Writer threads add messages + for (int i = 0; i < numWriteThreads; i++) { + final int writerId = i; + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < messagesPerWriter; j++) { + long ledgerId = writerId * 10000 + j; + long entryId = j; + boolean added = tracker.addMessage(ledgerId, entryId, System.currentTimeMillis() + 30000); + if (added) { + totalMessagesAdded.incrementAndGet(); + } + } + } catch (Exception e) { + // Ignore exceptions for this test + } finally { + writersDone.countDown(); + } + }); + } + + // Reader threads check for messages + for (int i = 0; i < numReadThreads; i++) { + final int readerId = i; + executorService.submit(() -> { + try { + startLatch.await(); + + // Read for a while to catch messages being added + long endTime = System.currentTimeMillis() + 5000; // Read for 5 seconds + while (System.currentTimeMillis() < endTime) { + for (int writerId = 0; writerId < numWriteThreads; writerId++) { + for (int j = 0; j < messagesPerWriter; j++) { + long ledgerId = writerId * 10000 + j; + long entryId = j; + if (tracker.containsMessage(ledgerId, entryId)) { + foundMessages.incrementAndGet(); + } + } + } + Thread.sleep(10); // Small delay to allow writes + } + } catch (Exception e) { + // Ignore exceptions for this test + } finally { + readersDone.countDown(); + } + }); + } + + startLatch.countDown(); + + assertTrue(writersDone.await(30, TimeUnit.SECONDS), "Writers should complete"); + assertTrue(readersDone.await(30, TimeUnit.SECONDS), "Readers should complete"); + + // Verify final consistency + long finalMessageCount = tracker.getNumberOfDelayedMessages(); + assertTrue(finalMessageCount >= 0, "Message count should be non-negative"); + + // The exact counts may vary due to timing, but we should have some successful operations + assertTrue(totalMessagesAdded.get() > 0, "Some messages should have been added"); + } + + /** + * Test optimistic read performance under varying contention levels. + * This helps validate that the StampedLock optimistic reads are working efficiently. + */ + @Test + public void testOptimisticReadPerformance() throws Exception { + // Add baseline messages + for (int i = 0; i < 1000; i++) { + tracker.addMessage(i, i, System.currentTimeMillis() + 60000); + } + + final int[] threadCounts = {1, 2, 4, 8, 16}; + + for (int numThreads : threadCounts) { + final int readsPerThread = 10000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(numThreads); + final AtomicLong totalReads = new AtomicLong(0); + + long startTime = System.nanoTime(); + + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + executorService.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < readsPerThread; j++) { + // Mix of existing and non-existing messages + long ledgerId = (threadId * readsPerThread + j) % 2000; + long entryId = j % 1000; + tracker.containsMessage(ledgerId, entryId); + totalReads.incrementAndGet(); + } + } catch (Exception e) { + // Ignore for performance test + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + assertTrue(doneLatch.await(30, TimeUnit.SECONDS), + "Performance test with " + numThreads + " threads should complete"); + + long endTime = System.nanoTime(); + long duration = endTime - startTime; + double throughput = (totalReads.get() * 1_000_000_000.0) / duration; + + System.out.printf("Threads: %d, Reads: %d, Throughput: %.0f ops/sec%n", + numThreads, totalReads.get(), throughput); + + // Basic sanity check - should achieve reasonable throughput + assertTrue(throughput > 10000, "Should achieve at least 10K ops/sec with " + numThreads + " threads"); + } + } +} \ No newline at end of file