This is an automated email from the ASF dual-hosted git repository.

thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 98105943eef [improve][broker] Reduce cpu usage of 
InMemoryDelayedDeliveryTracker. (#24430)
98105943eef is described below

commit 98105943eefcb73b4ad1063ce4b22c3a2626f362
Author: Wenzhi Feng <[email protected]>
AuthorDate: Thu Jun 19 20:58:04 2025 +0800

    [improve][broker] Reduce cpu usage of InMemoryDelayedDeliveryTracker. 
(#24430)
    
    Co-authored-by: thetumbled <[email protected]>
---
 .../broker/delayed/InMemoryDelayedDeliveryTracker.java      | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index a48ed416138..900896cdf47 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -30,6 +30,7 @@ import java.time.Clock;
 import java.util.NavigableSet;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
@@ -64,6 +65,9 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
     // The bit count to trim to reduce memory occupation.
     private final int timestampPrecisionBitCnt;
 
+    // Count of delayed messages in the tracker.
+    private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+
     
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
                                    long tickTimeMillis,
                                    boolean 
isDelayedDeliveryDeliverAtTimeStrict,
@@ -125,6 +129,8 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         delayedMessageMap.computeIfAbsent(timestamp, k -> new 
Long2ObjectRBTreeMap<>())
                 .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
                 .add(entryId);
+        delayedMessagesCount.incrementAndGet();
+
         updateTimer();
 
         checkAndUpdateHighest(deliverAt);
@@ -183,6 +189,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                         positions.add(PositionFactory.create(ledgerId, 
entryId));
                     });
                     n -= cardinality;
+                    delayedMessagesCount.addAndGet(-cardinality);
                     ledgerIdToDelete.add(ledgerId);
                 } else {
                     long[] entryIdsArray = entryIds.toArray();
@@ -190,6 +197,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                         positions.add(PositionFactory.create(ledgerId, 
entryIdsArray[i]));
                         entryIds.removeLong(entryIdsArray[i]);
                     }
+                    delayedMessagesCount.addAndGet(-n);
                     n = 0;
                 }
                 if (n <= 0) {
@@ -221,14 +229,13 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
     @Override
     public CompletableFuture<Void> clear() {
         this.delayedMessageMap.clear();
+        this.delayedMessagesCount.set(0);
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public long getNumberOfDelayedMessages() {
-        return delayedMessageMap.values().stream().mapToLong(
-                ledgerMap -> ledgerMap.values().stream().mapToLong(
-                        Roaring64Bitmap::getLongCardinality).sum()).sum();
+        return delayedMessagesCount.get();
     }
 
     /**

Reply via email to