Hi WenZhi,

the main problem is that this kind of optimization should never have been
cherry-picked into a stable branch.

We should be way more strict in only backporting bug-fixes / security-fixes
into stable branches. Every other improvement or feature will roll into the
next feature release.


--
Matteo Merli
<matteo.me...@gmail.com>


On Wed, Jun 18, 2025 at 4:42 PM WenZhi Feng <thetumb...@apache.org> wrote:

> Hi Penghui,
> Thanks for your reply and the evidence provided. We can fix this problem
> by improving the implementation of method `getNumberOfDelayedMessages()`,
> which is a simple patch. I can help to do this.
>
> thanks,
> Wenzhi Feng.
>
> On 2025/06/18 23:26:08 PengHui Li wrote:
> > Hi Wenzhi,
> >
> > After upgrading our broker environment from version 3.0.x to 4.0.x,
> > we observed a critical performance regression.
> > The broker's CPU usage has consistently been more than three times
> > higher than pre-upgrade levels, causing it to hit its allocated CPU
> limit.
> >
> > The stacktrace from the process during this high-utilization period is
> > attached below for diagnostics.
> >
> > ```
> > "broker-topic-workers-OrderedExecutor-1-0" #51 [95] prio=5 os_prio=0
> > cpu=1678706.57ms elapsed=15712.43s tid=0x00007fa0da32b820 nid=95 runnable
> >  [0x00007fa0d6c00000]
> >    java.lang.Thread.State: RUNNABLE
> >         at java.util.stream.AbstractPipeline.wrapSink(java.base@21.0.7
> /Unknown
> > Source)
> >         at
> > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7
> /Unknown
> > Source)
> >         at
> > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.stream.LongPipeline.reduce(java.base@21.0.7/Unknown
> > Source)
> >         at java.util.stream.LongPipeline.sum(java.base@21.0.7/Unknown
> > Source)
> >         at
> >
> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231)
> >         at
> >
> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown
> > Source)
> >         at java.util.stream.ReferencePipeline$5$1.accept(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.Iterator.forEachRemaining(java.base@21.0.7/Unknown
> > Source)
> >         at
> >
> it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158)
> >         at java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7
> /Unknown
> > Source)
> >         at
> > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7
> /Unknown
> > Source)
> >         at
> > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.stream.LongPipeline.reduce(java.base@21.0.7/Unknown
> > Source)
> >         at java.util.stream.LongPipeline.sum(java.base@21.0.7/Unknown
> > Source)
> >         at
> >
> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231)
> >         at
> >
> org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91)
> >         at
> >
> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.addMessage(InMemoryDelayedDeliveryTracker.java:128)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trackDelayedDelivery(PersistentDispatcherMultipleConsumers.java:1314)
> >         - locked <0x00000000c21d88b8> (a
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
> >         at
> >
> org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:228)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:864)
> >         - locked <0x00000000c21d88b8> (a
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:769)
> >         - eliminated <0x00000000c21d88b8> (a
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:729)
> >         - locked <0x00000000c21d88b8> (a
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown
> > Source)
> >         at
> >
> org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128)
> >         at
> >
> org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99)
> >         at
> >
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> >         at java.lang.Thread.runWith(java.base@21.0.7/Unknown Source)
> >         at java.lang.Thread.run(java.base@21.0.7/Unknown Source)
> >
> > "broker-topic-workers-OrderedExecutor-2-0" #52 [96] prio=5 os_prio=0
> > cpu=1797445.47ms elapsed=15712.43s tid=0x00007fa0d9419fc0 nid=96 runnable
> >  [0x00007fa0d6aff000]
> >    java.lang.Thread.State: RUNNABLE
> >         at java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7
> /Unknown
> > Source)
> >         at
> > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7
> /Unknown
> > Source)
> >         at
> > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.stream.LongPipeline.reduce(java.base@21.0.7/Unknown
> > Source)
> >         at java.util.stream.LongPipeline.sum(java.base@21.0.7/Unknown
> > Source)
> >         at
> >
> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231)
> >         at
> >
> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown
> > Source)
> >         at java.util.stream.ReferencePipeline$5$1.accept(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.Iterator.forEachRemaining(java.base@21.0.7/Unknown
> > Source)
> >         at
> >
> it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158)
> >         at java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7
> /Unknown
> > Source)
> >         at
> > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7
> /Unknown
> > Source)
> >         at
> > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7
> /Unknown
> > Source)
> >         at java.util.stream.LongPipeline.reduce(java.base@21.0.7/Unknown
> > Source)
> >         at java.util.stream.LongPipeline.sum(java.base@21.0.7/Unknown
> > Source)
> >         at
> >
> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231)
> >         at
> >
> org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91)
> >         at
> >
> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getScheduledMessages(InMemoryDelayedDeliveryTracker.java:217)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentDispatcherMultipleConsumers.java:1327)
> >         - locked <0x00000000b8cd7968> (a
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:386)
> >         - locked <0x00000000b8cd7968> (a
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:743)
> >         - locked <0x00000000b8cd7968> (a
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719)
> >         at
> >
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown
> > Source)
> >         at
> >
> org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128)
> >         at
> >
> org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99)
> >         at
> >
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> >         at java.lang.Thread.runWith(java.base@21.0.7/Unknown Source)
> >         at java.lang.Thread.run(java.base@21.0.7/Unknown Source)
> > ```
> > The flamegraph and stacktrace I provided are the primary evidence
> > we've captured for the performance regression.
> >
> >
> https://github.com/apache/pulsar/pull/23611/files#diff-f159b4e262ff6213bba19d20e6dc01d07ea8c19f4675524e4ceb1470f456e8fcR229-R231
> > Here is the change that caused this issue. Each message added to the
> > DelayedMessageTracker will call `getNumberOfDelayedMessages()` method
> > which will iterate the map defined in InMemoryDelayedDeliveryTracker
> >
> > ```
> >
> > protected final
> Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
> >         delayedMessageMap = new Long2ObjectAVLTreeMap<>();
> >
> > ```
> >
> > Regards
> >
> > Penghui
> >
> >
> > On Wed, Jun 18, 2025 at 4:01 PM WenZhi Feng <thetumb...@apache.org>
> wrote:
> >
> > > -1 until much clearer proof is shown.
> > >
> > > On 2025/06/18 22:26:26 PengHui Li wrote:
> > > > Hi team,
> > > >
> > > > I am writing to propose an urgent revert of PR #23611
> > > >
> > > > - https://github.com/apache/pulsar/pull/23611
> > > >
> > > > *Reason for Revert:*
> > > >
> > > > This PR has introduced a significant performance regression in the
> Pulsar
> > > > broker.
> > > > The attached flame graph visually demonstrates increased CPU
> utilization
> > > > and time
> > > > spent in the code paths related to DelayedDeliveryTracker and stream
> > > > operations.
> > > >
> > > > While the intention was to optimize memory usage, the current
> > > > implementation
> > > > appears to have an adverse effect on CPU performance, leading to
> overall
> > > > degraded
> > > > broker throughput and increased latency.
> > > >
> > > > *Impact:*
> > > >
> > > > This regression is impacting the stability and performance of our
> Pulsar
> > > > clusters,
> > > > especially when you have large-scale delayed messages. Reverting the
> > > change
> > > > will
> > > > allow us to restore the previous performance characteristics while we
> > > > investigate
> > > > a more robust and performant solution for DelayedDeliveryTracker
> memory
> > > > optimization.
> > > >
> > > > *Proposed Action:*
> > > >
> > > > I propose to revert PR #23611 as soon as possible to reduce the risk
> to
> > > > other users,
> > > > and we can then collectively work on a more thoroughly tested and
> > > > performant approach
> > > > to optimize the DelayedDeliveryTracker memory.
> > > >
> > > > Please let me know your thoughts, and if there are any immediate
> concerns
> > > > with this proposed revert.
> > > >
> > > > Regards,
> > > > Penghui
> > > >
> > >
> >
>

Reply via email to