This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cbf4d2ad0d [fix][broker] Correct two race conditions in the tracker 
code and logic bug in InMemoryDelayedDeliveryTracker that failed with 
NoSuchElementException (#25681)
6cbf4d2ad0d is described below

commit 6cbf4d2ad0d1514bfd2885015084102c2896ea67
Author: Chris Hamons <[email protected]>
AuthorDate: Fri May 8 15:41:23 2026 -0500

    [fix][broker] Correct two race conditions in the tracker code and logic bug 
in InMemoryDelayedDeliveryTracker that failed with NoSuchElementException 
(#25681)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    | 28 +++---
 .../PersistentDispatcherMultipleConsumers.java     | 10 +--
 ...rsistentDispatcherMultipleConsumersClassic.java | 10 +--
 .../delayed/InMemoryDeliveryTrackerTest.java       | 10 +++
 ...tentDispatcherMultipleConsumersClassicTest.java | 99 ++++++++++++++++++++++
 .../PersistentDispatcherMultipleConsumersTest.java | 99 ++++++++++++++++++++++
 6 files changed, 236 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 8da74a553dd..b1d4e8cecbd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -127,16 +127,24 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
             messagesHaveFixedDelay = false;
             return false;
         }
-            log.debug()
-                    .attr("ledgerId", ledgerId)
-                    .attr("entryId", entryId)
-                    .attr("deliveryInMs", () -> deliverAt - clock.millis())
-                    .log("Add message");
-                long timestamp = trimLowerBit(deliverAt, 
timestampPrecisionBitCnt);
-        delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>())
-                .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
-                .add(entryId);
-        delayedMessagesCount.incrementAndGet();
+
+        log.debug()
+                .attr("ledgerId", ledgerId)
+                .attr("entryId", entryId)
+                .attr("deliveryInMs", () -> deliverAt - clock.millis())
+                .log("Add message");
+        long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+
+        Roaring64Bitmap bitmap = delayedMessageMap.computeIfAbsent(timestamp, 
k -> new TreeMap<>())
+            .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap());
+        // Roaring64Bitmap does not store duplicates, so track if it a new 
element
+        // so we can keep delayedMessagesCount in sync
+        boolean isNew = !bitmap.contains(entryId);
+
+        if (isNew) {
+            bitmap.add(entryId);
+            delayedMessagesCount.incrementAndGet();
+        }
 
         updateTimer();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c569cf5b68c..9bcf9153572 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -427,7 +427,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
         }
     }
 
-    protected Predicate<Position> 
createReadEntriesSkipConditionForNormalRead() {
+    protected synchronized Predicate<Position> 
createReadEntriesSkipConditionForNormalRead() {
         Predicate<Position> skipCondition = null;
         // Filter out and skip read delayed messages exist in 
DelayedDeliveryTracker
         if (delayedDeliveryTracker.isPresent()) {
@@ -1378,12 +1378,12 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractPersistentDis
     }
 
     @Override
-    public long getNumberOfDelayedMessages() {
+    public synchronized long getNumberOfDelayedMessages() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
     }
 
     @Override
-    public CompletableFuture<Void> clearDelayedMessages() {
+    public synchronized CompletableFuture<Void> clearDelayedMessages() {
         if (!topic.isDelayedDeliveryEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
@@ -1464,11 +1464,11 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractPersistentDis
     }
 
 
-    public long getDelayedTrackerMemoryUsage() {
+    public synchronized long getDelayedTrackerMemoryUsage() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
     }
 
-    public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
+    public synchronized Map<String, TopicMetricBean> 
getBucketDelayedIndexStats() {
         if (delayedDeliveryTracker.isEmpty()) {
             return Collections.emptyMap();
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
index 6a01add4f2e..276f8c038a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
@@ -1146,7 +1146,7 @@ public class PersistentDispatcherMultipleConsumersClassic 
extends AbstractPersis
     }
 
     @Override
-    public boolean trackDelayedDelivery(long ledgerId, long entryId, 
MessageMetadata msgMetadata) {
+    public synchronized boolean trackDelayedDelivery(long ledgerId, long 
entryId, MessageMetadata msgMetadata) {
         if (!topic.isDelayedDeliveryEnabled()) {
             // If broker has the feature disabled, always deliver messages 
immediately
             return false;
@@ -1212,12 +1212,12 @@ public class 
PersistentDispatcherMultipleConsumersClassic extends AbstractPersis
     }
 
     @Override
-    public long getNumberOfDelayedMessages() {
+    public synchronized long getNumberOfDelayedMessages() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
     }
 
     @Override
-    public CompletableFuture<Void> clearDelayedMessages() {
+    public synchronized CompletableFuture<Void> clearDelayedMessages() {
         if (!topic.isDelayedDeliveryEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
@@ -1291,11 +1291,11 @@ public class 
PersistentDispatcherMultipleConsumersClassic extends AbstractPersis
     }
 
 
-    public long getDelayedTrackerMemoryUsage() {
+    public synchronized long getDelayedTrackerMemoryUsage() {
         return 
delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
     }
 
-    public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
+    public synchronized Map<String, TopicMetricBean> 
getBucketDelayedIndexStats() {
         if (delayedDeliveryTracker.isEmpty()) {
             return Collections.emptyMap();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index e25595072d3..322992d7b1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -274,4 +274,14 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
         tracker.close();
     }
 
+    @Test(dataProvider = "delayedTracker")
+    public void 
testAddMultipleMessagesSameWindow(InMemoryDelayedDeliveryTracker tracker) 
throws Exception {
+        tracker.addMessage(1, 1, 50);
+        tracker.addMessage(1, 1, 50);
+        tracker.addMessage(1, 1, 50);
+
+        clockTime.set(60);
+
+        tracker.getScheduledMessages(10);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
index 5c87c74d9bb..e81e4bb6c1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassicTest.java
@@ -20,7 +20,12 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.carrotsearch.hppc.ObjectSet;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.CustomLog;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -34,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -152,4 +158,97 @@ public class 
PersistentDispatcherMultipleConsumersClassicTest extends SharedPuls
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testRaceConditionInTrackDelayedDelivery() throws Exception {
+        final int numThreads = 16;
+        final int operationsPerThread = 2000;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+
+        final String topicName = newTopicName();
+        final String subscription = "s1";
+
+        // Needed to create the topic
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName).subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) getTopic(topicName, 
false).join().get();
+
+        ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+        Mockito.doReturn(subscription).when(cursor).getName();
+
+        Subscription sub = Mockito.mock(PersistentSubscription.class);
+        Mockito.doReturn(topic).when(sub).getTopic();
+
+        PersistentDispatcherMultipleConsumersClassic dispatcher =
+            new PersistentDispatcherMultipleConsumersClassic(topic, cursor, 
sub);
+
+        // Align all writes to the same bucket
+        // This is the key which triggers the race condition
+        long deliverAt = System.currentTimeMillis() + 5000;
+
+        MessageMetadata messageMetadata = new MessageMetadata()
+            .setSequenceId(1)
+            .setProducerName("testProducer")
+            .setPartitionKeyB64Encoded(false)
+            .setPublishTime(System.currentTimeMillis())
+            .setDeliverAtTime(deliverAt);
+
+        @Cleanup("shutdown")
+        ExecutorService executorService = Executors.newFixedThreadPool(32);
+
+        // Start clear message thread
+        for (int i = 0; i < numThreads / 2; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        dispatcher.clearDelayedMessages();
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        // Start track delayed delivery thread
+        for (int i = numThreads / 2; i < numThreads; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        dispatcher.trackDelayedDelivery(1, 1, messageMetadata);
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+        Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should 
complete within 30 seconds");
+
+        if (errors.get() > 0) {
+            Exception exception = firstException.get();
+            if (exception != null) {
+                System.err.println("First exception caught: " + 
exception.getMessage());
+                exception.printStackTrace();
+            }
+        }
+        Assert.assertEquals(errors.get(), 0, "No exceptions should occur 
during concurrent operations");
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
index e5da7850dfd..cf91f29988e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
@@ -20,7 +20,12 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.carrotsearch.hppc.ObjectSet;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.CustomLog;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -34,6 +39,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -152,4 +158,97 @@ public class PersistentDispatcherMultipleConsumersTest 
extends SharedPulsarBaseT
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testRaceConditionInTrackDelayedDelivery() throws Exception {
+        final int numThreads = 16;
+        final int operationsPerThread = 2000;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicReference<Exception> firstException = new 
AtomicReference<>();
+
+        final String topicName = newTopicName();
+        final String subscription = "s1";
+
+        // Needed to create the topic
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName).subscriptionName(subscription)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) getTopic(topicName, 
false).join().get();
+
+        ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+        Mockito.doReturn(subscription).when(cursor).getName();
+
+        Subscription sub = Mockito.mock(PersistentSubscription.class);
+        Mockito.doReturn(topic).when(sub).getTopic();
+
+        PersistentDispatcherMultipleConsumers dispatcher =
+            new PersistentDispatcherMultipleConsumers(topic, cursor, sub);
+
+        // Align all writes to the same bucket
+        // This is the key which triggers the race condition
+        long deliverAt = System.currentTimeMillis() + 5000;
+
+        MessageMetadata messageMetadata = new MessageMetadata()
+            .setSequenceId(1)
+            .setProducerName("testProducer")
+            .setPartitionKeyB64Encoded(false)
+            .setPublishTime(System.currentTimeMillis())
+            .setDeliverAtTime(deliverAt);
+
+        @Cleanup("shutdown")
+        ExecutorService executorService = Executors.newFixedThreadPool(32);
+
+        // Start clear message thread
+        for (int i = 0; i < numThreads / 2; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        dispatcher.clearDelayedMessages();
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        // Start track delayed delivery thread
+        for (int i = numThreads / 2; i < numThreads; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    for (int j = 0; j < operationsPerThread; j++) {
+                        dispatcher.trackDelayedDelivery(1, 1, messageMetadata);
+                        Thread.sleep(1);
+                    }
+                } catch (Exception e) {
+                    errors.incrementAndGet();
+                    firstException.compareAndSet(null, e);
+                    e.printStackTrace();
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+        }
+
+        startLatch.countDown();
+        Assert.assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Test should 
complete within 30 seconds");
+
+        if (errors.get() > 0) {
+            Exception exception = firstException.get();
+            if (exception != null) {
+                System.err.println("First exception caught: " + 
exception.getMessage());
+                exception.printStackTrace();
+            }
+        }
+        Assert.assertEquals(errors.get(), 0, "No exceptions should occur 
during concurrent operations");
+    }
 }

Reply via email to