This is an automated email from the ASF dual-hosted git repository.
thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 98105943eef [improve][broker] Reduce cpu usage of
InMemoryDelayedDeliveryTracker. (#24430)
98105943eef is described below
commit 98105943eefcb73b4ad1063ce4b22c3a2626f362
Author: Wenzhi Feng <[email protected]>
AuthorDate: Thu Jun 19 20:58:04 2025 +0800
[improve][broker] Reduce cpu usage of InMemoryDelayedDeliveryTracker.
(#24430)
Co-authored-by: thetumbled <[email protected]>
---
.../broker/delayed/InMemoryDelayedDeliveryTracker.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index a48ed416138..900896cdf47 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -30,6 +30,7 @@ import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
@@ -64,6 +65,9 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
// The bit count to trim to reduce memory occupation.
private final int timestampPrecisionBitCnt;
+ // Count of delayed messages in the tracker.
+ private final AtomicLong delayedMessagesCount = new AtomicLong(0);
+
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers
dispatcher, Timer timer,
long tickTimeMillis,
boolean
isDelayedDeliveryDeliverAtTimeStrict,
@@ -125,6 +129,8 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
delayedMessageMap.computeIfAbsent(timestamp, k -> new
Long2ObjectRBTreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
+ delayedMessagesCount.incrementAndGet();
+
updateTimer();
checkAndUpdateHighest(deliverAt);
@@ -183,6 +189,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
positions.add(PositionFactory.create(ledgerId,
entryId));
});
n -= cardinality;
+ delayedMessagesCount.addAndGet(-cardinality);
ledgerIdToDelete.add(ledgerId);
} else {
long[] entryIdsArray = entryIds.toArray();
@@ -190,6 +197,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
positions.add(PositionFactory.create(ledgerId,
entryIdsArray[i]));
entryIds.removeLong(entryIdsArray[i]);
}
+ delayedMessagesCount.addAndGet(-n);
n = 0;
}
if (n <= 0) {
@@ -221,14 +229,13 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
@Override
public CompletableFuture<Void> clear() {
this.delayedMessageMap.clear();
+ this.delayedMessagesCount.set(0);
return CompletableFuture.completedFuture(null);
}
@Override
public long getNumberOfDelayedMessages() {
- return delayedMessageMap.values().stream().mapToLong(
- ledgerMap -> ledgerMap.values().stream().mapToLong(
- Roaring64Bitmap::getLongCardinality).sum()).sum();
+ return delayedMessagesCount.get();
}
/**