Hi Wenzhi, After upgrading our broker environment from version 3.0.x to 4.0.x, we observed a critical performance regression. The broker's CPU usage has consistently been more than three times higher than pre-upgrade levels, causing it to hit its allocated CPU limit.
The stacktrace from the process during this high-utilization period is attached below for diagnostics. ``` "broker-topic-workers-OrderedExecutor-1-0" #51 [95] prio=5 os_prio=0 cpu=1678706.57ms elapsed=15712.43s tid=0x00007fa0da32b820 nid=95 runnable [0x00007fa0d6c00000] java.lang.Thread.State: RUNNABLE at java.util.stream.AbstractPipeline.wrapSink(java.base@21.0.7/Unknown Source) at java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7/Unknown Source) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7/Unknown Source) at java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7/Unknown Source) at java.util.stream.LongPipeline.reduce(java.base@21.0.7/Unknown Source) at java.util.stream.LongPipeline.sum(java.base@21.0.7/Unknown Source) at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231) at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown Source) at java.util.stream.ReferencePipeline$5$1.accept(java.base@21.0.7/Unknown Source) at java.util.Iterator.forEachRemaining(java.base@21.0.7/Unknown Source) at it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158) at java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7/Unknown Source) at java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7/Unknown Source) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7/Unknown Source) at java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7/Unknown Source) at java.util.stream.LongPipeline.reduce(java.base@21.0.7/Unknown Source) at java.util.stream.LongPipeline.sum(java.base@21.0.7/Unknown Source) at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231) at org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91) at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.addMessage(InMemoryDelayedDeliveryTracker.java:128) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trackDelayedDelivery(PersistentDispatcherMultipleConsumers.java:1314) - locked <0x00000000c21d88b8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:228) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:864) - locked <0x00000000c21d88b8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:769) - eliminated <0x00000000c21d88b8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:729) - locked <0x00000000c21d88b8> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown Source) at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.runWith(java.base@21.0.7/Unknown Source) at java.lang.Thread.run(java.base@21.0.7/Unknown Source) "broker-topic-workers-OrderedExecutor-2-0" #52 [96] prio=5 os_prio=0 cpu=1797445.47ms elapsed=15712.43s tid=0x00007fa0d9419fc0 nid=96 runnable [0x00007fa0d6aff000] java.lang.Thread.State: RUNNABLE at java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7/Unknown Source) at java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7/Unknown Source) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7/Unknown Source) at java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7/Unknown Source) at java.util.stream.LongPipeline.reduce(java.base@21.0.7/Unknown Source) at java.util.stream.LongPipeline.sum(java.base@21.0.7/Unknown Source) at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231) at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown Source) at java.util.stream.ReferencePipeline$5$1.accept(java.base@21.0.7/Unknown Source) at java.util.Iterator.forEachRemaining(java.base@21.0.7/Unknown Source) at it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158) at java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7/Unknown Source) at java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7/Unknown Source) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7/Unknown Source) at java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7/Unknown Source) at java.util.stream.LongPipeline.reduce(java.base@21.0.7/Unknown Source) at java.util.stream.LongPipeline.sum(java.base@21.0.7/Unknown Source) at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231) at org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91) at org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getScheduledMessages(InMemoryDelayedDeliveryTracker.java:217) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentDispatcherMultipleConsumers.java:1327) - locked <0x00000000b8cd7968> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:386) - locked <0x00000000b8cd7968> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:743) - locked <0x00000000b8cd7968> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown Source) at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.runWith(java.base@21.0.7/Unknown Source) at java.lang.Thread.run(java.base@21.0.7/Unknown Source) ``` The flamegraph and stacktrace I provided are the primary evidence we've captured for the performance regression. https://github.com/apache/pulsar/pull/23611/files#diff-f159b4e262ff6213bba19d20e6dc01d07ea8c19f4675524e4ceb1470f456e8fcR229-R231 Here is the change that caused this issue. Each message added to the DelayedMessageTracker will call `getNumberOfDelayedMessages()` method which will iterate the map defined in InMemoryDelayedDeliveryTracker ``` protected final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>> delayedMessageMap = new Long2ObjectAVLTreeMap<>(); ``` Regards Penghui On Wed, Jun 18, 2025 at 4:01 PM WenZhi Feng <thetumb...@apache.org> wrote: > -1 until much clearer proof is shown. > > On 2025/06/18 22:26:26 PengHui Li wrote: > > Hi team, > > > > I am writing to propose an urgent revert of PR #23611 > > > > - https://github.com/apache/pulsar/pull/23611 > > > > *Reason for Revert:* > > > > This PR has introduced a significant performance regression in the Pulsar > > broker. > > The attached flame graph visually demonstrates increased CPU utilization > > and time > > spent in the code paths related to DelayedDeliveryTracker and stream > > operations. > > > > While the intention was to optimize memory usage, the current > > implementation > > appears to have an adverse effect on CPU performance, leading to overall > > degraded > > broker throughput and increased latency. > > > > *Impact:* > > > > This regression is impacting the stability and performance of our Pulsar > > clusters, > > especially when you have large-scale delayed messages. Reverting the > change > > will > > allow us to restore the previous performance characteristics while we > > investigate > > a more robust and performant solution for DelayedDeliveryTracker memory > > optimization. > > > > *Proposed Action:* > > > > I propose to revert PR #23611 as soon as possible to reduce the risk to > > other users, > > and we can then collectively work on a more thoroughly tested and > > performant approach > > to optimize the DelayedDeliveryTracker memory. > > > > Please let me know your thoughts, and if there are any immediate concerns > > with this proposed revert. > > > > Regards, > > Penghui > > >