All the tests get passed and I have merged the PR Thanks Penghui
On Wed, Jun 18, 2025 at 7:40 PM PengHui Li <peng...@apache.org> wrote: > Here is the PR https://github.com/apache/pulsar/pull/24429 to > revert https://github.com/apache/pulsar/pull/23611 and a related PR > > Thanks, > Penghui > > On Wed, Jun 18, 2025 at 6:24 PM WenZhi Feng <thetumb...@apache.org> wrote: > >> I'm good to only revert from branch-4.0 too. >> >> Thanks, >> Wenzhi Feng. >> >> >> On 2025/06/19 00:36:18 PengHui Li wrote: >> > > the main problem is that this kind of optimization should never have >> been >> > cherry-picked into a stable branch. >> > >> > > We should be way more strict in only backporting bug-fixes / >> > security-fixes >> > into stable branches. Every other improvement or feature will roll into >> the >> > next feature release. >> > >> > +1, I'm good to only revert from branch-4.0 and keep working on the fix >> > on the master branch >> > >> > Thanks, >> > penghui >> > >> > On Wed, Jun 18, 2025 at 5:06 PM Matteo Merli <matteo.me...@gmail.com> >> wrote: >> > >> > > Hi WenZhi, >> > > >> > > the main problem is that this kind of optimization should never have >> been >> > > cherry-picked into a stable branch. >> > > >> > > We should be way more strict in only backporting bug-fixes / >> security-fixes >> > > into stable branches. Every other improvement or feature will roll >> into the >> > > next feature release. >> > > >> > > >> > > -- >> > > Matteo Merli >> > > <matteo.me...@gmail.com> >> > > >> > > >> > > On Wed, Jun 18, 2025 at 4:42 PM WenZhi Feng <thetumb...@apache.org> >> wrote: >> > > >> > > > Hi Penghui, >> > > > Thanks for your reply and the evidence provided. We can fix this >> problem >> > > > by improving the implementation of method >> `getNumberOfDelayedMessages()`, >> > > > which is a simple patch. I can help to do this. >> > > > >> > > > thanks, >> > > > Wenzhi Feng. >> > > > >> > > > On 2025/06/18 23:26:08 PengHui Li wrote: >> > > > > Hi Wenzhi, >> > > > > >> > > > > After upgrading our broker environment from version 3.0.x to >> 4.0.x, >> > > > > we observed a critical performance regression. >> > > > > The broker's CPU usage has consistently been more than three times >> > > > > higher than pre-upgrade levels, causing it to hit its allocated >> CPU >> > > > limit. >> > > > > >> > > > > The stacktrace from the process during this high-utilization >> period is >> > > > > attached below for diagnostics. >> > > > > >> > > > > ``` >> > > > > "broker-topic-workers-OrderedExecutor-1-0" #51 [95] prio=5 >> os_prio=0 >> > > > > cpu=1678706.57ms elapsed=15712.43s tid=0x00007fa0da32b820 nid=95 >> > > runnable >> > > > > [0x00007fa0d6c00000] >> > > > > java.lang.Thread.State: RUNNABLE >> > > > > at >> java.util.stream.AbstractPipeline.wrapSink(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> > > > > >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at java.util.stream.LongPipeline.reduce(java.base@21.0.7 >> > > /Unknown >> > > > > Source) >> > > > > at java.util.stream.LongPipeline.sum(java.base@21.0.7 >> /Unknown >> > > > > Source) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown >> > > > > Source) >> > > > > at >> > > java.util.stream.ReferencePipeline$5$1.accept(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at java.util.Iterator.forEachRemaining(java.base@21.0.7 >> > > /Unknown >> > > > > Source) >> > > > > at >> > > > > >> > > > >> > > >> it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158) >> > > > > at >> java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> > > > > >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at java.util.stream.LongPipeline.reduce(java.base@21.0.7 >> > > /Unknown >> > > > > Source) >> > > > > at java.util.stream.LongPipeline.sum(java.base@21.0.7 >> /Unknown >> > > > > Source) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.addMessage(InMemoryDelayedDeliveryTracker.java:128) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trackDelayedDelivery(PersistentDispatcherMultipleConsumers.java:1314) >> > > > > - locked <0x00000000c21d88b8> (a >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:228) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:864) >> > > > > - locked <0x00000000c21d88b8> (a >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:769) >> > > > > - eliminated <0x00000000c21d88b8> (a >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:729) >> > > > > - locked <0x00000000c21d88b8> (a >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown >> > > > > Source) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) >> > > > > at >> > > > > >> > > > >> > > >> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) >> > > > > at java.lang.Thread.runWith(java.base@21.0.7/Unknown >> Source) >> > > > > at java.lang.Thread.run(java.base@21.0.7/Unknown Source) >> > > > > >> > > > > "broker-topic-workers-OrderedExecutor-2-0" #52 [96] prio=5 >> os_prio=0 >> > > > > cpu=1797445.47ms elapsed=15712.43s tid=0x00007fa0d9419fc0 nid=96 >> > > runnable >> > > > > [0x00007fa0d6aff000] >> > > > > java.lang.Thread.State: RUNNABLE >> > > > > at >> java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> > > > > >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at java.util.stream.LongPipeline.reduce(java.base@21.0.7 >> > > /Unknown >> > > > > Source) >> > > > > at java.util.stream.LongPipeline.sum(java.base@21.0.7 >> /Unknown >> > > > > Source) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown >> > > > > Source) >> > > > > at >> > > java.util.stream.ReferencePipeline$5$1.accept(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at java.util.Iterator.forEachRemaining(java.base@21.0.7 >> > > /Unknown >> > > > > Source) >> > > > > at >> > > > > >> > > > >> > > >> it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158) >> > > > > at >> java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> > > > > >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at >> java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7 >> > > > /Unknown >> > > > > Source) >> > > > > at java.util.stream.LongPipeline.reduce(java.base@21.0.7 >> > > /Unknown >> > > > > Source) >> > > > > at java.util.stream.LongPipeline.sum(java.base@21.0.7 >> /Unknown >> > > > > Source) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getScheduledMessages(InMemoryDelayedDeliveryTracker.java:217) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentDispatcherMultipleConsumers.java:1327) >> > > > > - locked <0x00000000b8cd7968> (a >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:386) >> > > > > - locked <0x00000000b8cd7968> (a >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:743) >> > > > > - locked <0x00000000b8cd7968> (a >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown >> > > > > Source) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) >> > > > > at >> > > > > >> > > > >> > > >> org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) >> > > > > at >> > > > > >> > > > >> > > >> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) >> > > > > at java.lang.Thread.runWith(java.base@21.0.7/Unknown >> Source) >> > > > > at java.lang.Thread.run(java.base@21.0.7/Unknown Source) >> > > > > ``` >> > > > > The flamegraph and stacktrace I provided are the primary evidence >> > > > > we've captured for the performance regression. >> > > > > >> > > > > >> > > > >> > > >> https://github.com/apache/pulsar/pull/23611/files#diff-f159b4e262ff6213bba19d20e6dc01d07ea8c19f4675524e4ceb1470f456e8fcR229-R231 >> > > > > Here is the change that caused this issue. Each message added to >> the >> > > > > DelayedMessageTracker will call `getNumberOfDelayedMessages()` >> method >> > > > > which will iterate the map defined in >> InMemoryDelayedDeliveryTracker >> > > > > >> > > > > ``` >> > > > > >> > > > > protected final >> > > > Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>> >> > > > > delayedMessageMap = new Long2ObjectAVLTreeMap<>(); >> > > > > >> > > > > ``` >> > > > > >> > > > > Regards >> > > > > >> > > > > Penghui >> > > > > >> > > > > >> > > > > On Wed, Jun 18, 2025 at 4:01 PM WenZhi Feng < >> thetumb...@apache.org> >> > > > wrote: >> > > > > >> > > > > > -1 until much clearer proof is shown. >> > > > > > >> > > > > > On 2025/06/18 22:26:26 PengHui Li wrote: >> > > > > > > Hi team, >> > > > > > > >> > > > > > > I am writing to propose an urgent revert of PR #23611 >> > > > > > > >> > > > > > > - https://github.com/apache/pulsar/pull/23611 >> > > > > > > >> > > > > > > *Reason for Revert:* >> > > > > > > >> > > > > > > This PR has introduced a significant performance regression >> in the >> > > > Pulsar >> > > > > > > broker. >> > > > > > > The attached flame graph visually demonstrates increased CPU >> > > > utilization >> > > > > > > and time >> > > > > > > spent in the code paths related to DelayedDeliveryTracker and >> > > stream >> > > > > > > operations. >> > > > > > > >> > > > > > > While the intention was to optimize memory usage, the current >> > > > > > > implementation >> > > > > > > appears to have an adverse effect on CPU performance, leading >> to >> > > > overall >> > > > > > > degraded >> > > > > > > broker throughput and increased latency. >> > > > > > > >> > > > > > > *Impact:* >> > > > > > > >> > > > > > > This regression is impacting the stability and performance of >> our >> > > > Pulsar >> > > > > > > clusters, >> > > > > > > especially when you have large-scale delayed messages. >> Reverting >> > > the >> > > > > > change >> > > > > > > will >> > > > > > > allow us to restore the previous performance characteristics >> while >> > > we >> > > > > > > investigate >> > > > > > > a more robust and performant solution for >> DelayedDeliveryTracker >> > > > memory >> > > > > > > optimization. >> > > > > > > >> > > > > > > *Proposed Action:* >> > > > > > > >> > > > > > > I propose to revert PR #23611 as soon as possible to reduce >> the >> > > risk >> > > > to >> > > > > > > other users, >> > > > > > > and we can then collectively work on a more thoroughly tested >> and >> > > > > > > performant approach >> > > > > > > to optimize the DelayedDeliveryTracker memory. >> > > > > > > >> > > > > > > Please let me know your thoughts, and if there are any >> immediate >> > > > concerns >> > > > > > > with this proposed revert. >> > > > > > > >> > > > > > > Regards, >> > > > > > > Penghui >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >