All the tests get passed and I have merged the PR

Thanks
Penghui

On Wed, Jun 18, 2025 at 7:40 PM PengHui Li <peng...@apache.org> wrote:

> Here is the PR https://github.com/apache/pulsar/pull/24429 to
> revert https://github.com/apache/pulsar/pull/23611 and a related PR
>
> Thanks,
> Penghui
>
> On Wed, Jun 18, 2025 at 6:24 PM WenZhi Feng <thetumb...@apache.org> wrote:
>
>> I'm good to only revert from branch-4.0 too.
>>
>> Thanks,
>> Wenzhi Feng.
>>
>>
>> On 2025/06/19 00:36:18 PengHui Li wrote:
>> > > the main problem is that this kind of optimization should never have
>> been
>> > cherry-picked into a stable branch.
>> >
>> > > We should be way more strict in only backporting bug-fixes /
>> > security-fixes
>> > into stable branches. Every other improvement or feature will roll into
>> the
>> > next feature release.
>> >
>> > +1, I'm good to only revert from branch-4.0 and keep working on the fix
>> > on the master branch
>> >
>> > Thanks,
>> > penghui
>> >
>> > On Wed, Jun 18, 2025 at 5:06 PM Matteo Merli <matteo.me...@gmail.com>
>> wrote:
>> >
>> > > Hi WenZhi,
>> > >
>> > > the main problem is that this kind of optimization should never have
>> been
>> > > cherry-picked into a stable branch.
>> > >
>> > > We should be way more strict in only backporting bug-fixes /
>> security-fixes
>> > > into stable branches. Every other improvement or feature will roll
>> into the
>> > > next feature release.
>> > >
>> > >
>> > > --
>> > > Matteo Merli
>> > > <matteo.me...@gmail.com>
>> > >
>> > >
>> > > On Wed, Jun 18, 2025 at 4:42 PM WenZhi Feng <thetumb...@apache.org>
>> wrote:
>> > >
>> > > > Hi Penghui,
>> > > > Thanks for your reply and the evidence provided. We can fix this
>> problem
>> > > > by improving the implementation of method
>> `getNumberOfDelayedMessages()`,
>> > > > which is a simple patch. I can help to do this.
>> > > >
>> > > > thanks,
>> > > > Wenzhi Feng.
>> > > >
>> > > > On 2025/06/18 23:26:08 PengHui Li wrote:
>> > > > > Hi Wenzhi,
>> > > > >
>> > > > > After upgrading our broker environment from version 3.0.x to
>> 4.0.x,
>> > > > > we observed a critical performance regression.
>> > > > > The broker's CPU usage has consistently been more than three times
>> > > > > higher than pre-upgrade levels, causing it to hit its allocated
>> CPU
>> > > > limit.
>> > > > >
>> > > > > The stacktrace from the process during this high-utilization
>> period is
>> > > > > attached below for diagnostics.
>> > > > >
>> > > > > ```
>> > > > > "broker-topic-workers-OrderedExecutor-1-0" #51 [95] prio=5
>> os_prio=0
>> > > > > cpu=1678706.57ms elapsed=15712.43s tid=0x00007fa0da32b820 nid=95
>> > > runnable
>> > > > >  [0x00007fa0d6c00000]
>> > > > >    java.lang.Thread.State: RUNNABLE
>> > > > >         at
>> java.util.stream.AbstractPipeline.wrapSink(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.stream.LongPipeline.reduce(java.base@21.0.7
>> > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.stream.LongPipeline.sum(java.base@21.0.7
>> /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown
>> > > > > Source)
>> > > > >         at
>> > > java.util.stream.ReferencePipeline$5$1.accept(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.Iterator.forEachRemaining(java.base@21.0.7
>> > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158)
>> > > > >         at
>> java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.stream.LongPipeline.reduce(java.base@21.0.7
>> > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.stream.LongPipeline.sum(java.base@21.0.7
>> /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.addMessage(InMemoryDelayedDeliveryTracker.java:128)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trackDelayedDelivery(PersistentDispatcherMultipleConsumers.java:1314)
>> > > > >         - locked <0x00000000c21d88b8> (a
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:228)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:864)
>> > > > >         - locked <0x00000000c21d88b8> (a
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:769)
>> > > > >         - eliminated <0x00000000c21d88b8> (a
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:729)
>> > > > >         - locked <0x00000000c21d88b8> (a
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>> > > > >         at java.lang.Thread.runWith(java.base@21.0.7/Unknown
>> Source)
>> > > > >         at java.lang.Thread.run(java.base@21.0.7/Unknown Source)
>> > > > >
>> > > > > "broker-topic-workers-OrderedExecutor-2-0" #52 [96] prio=5
>> os_prio=0
>> > > > > cpu=1797445.47ms elapsed=15712.43s tid=0x00007fa0d9419fc0 nid=96
>> > > runnable
>> > > > >  [0x00007fa0d6aff000]
>> > > > >    java.lang.Thread.State: RUNNABLE
>> > > > >         at
>> java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.stream.LongPipeline.reduce(java.base@21.0.7
>> > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.stream.LongPipeline.sum(java.base@21.0.7
>> /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.lambda$getNumberOfDelayedMessages$3(InMemoryDelayedDeliveryTracker.java:231)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker$$Lambda/0x0000000100e73078.applyAsLong(Unknown
>> > > > > Source)
>> > > > >         at
>> > > java.util.stream.ReferencePipeline$5$1.accept(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.Iterator.forEachRemaining(java.base@21.0.7
>> > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> it.unimi.dsi.fastutil.objects.ObjectSpliterators$SpliteratorFromIterator.forEachRemaining(ObjectSpliterators.java:1158)
>> > > > >         at
>> java.util.stream.AbstractPipeline.copyInto(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at
>> java.util.stream.AbstractPipeline.evaluate(java.base@21.0.7
>> > > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.stream.LongPipeline.reduce(java.base@21.0.7
>> > > /Unknown
>> > > > > Source)
>> > > > >         at java.util.stream.LongPipeline.sum(java.base@21.0.7
>> /Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getNumberOfDelayedMessages(InMemoryDelayedDeliveryTracker.java:231)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker.updateTimer(AbstractDelayedDeliveryTracker.java:91)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker.getScheduledMessages(InMemoryDelayedDeliveryTracker.java:217)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentDispatcherMultipleConsumers.java:1327)
>> > > > >         - locked <0x00000000b8cd7968> (a
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:386)
>> > > > >         - locked <0x00000000b8cd7968> (a
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.handleSendingMessagesAndReadingMore(PersistentDispatcherMultipleConsumers.java:743)
>> > > > >         - locked <0x00000000b8cd7968> (a
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$9(PersistentDispatcherMultipleConsumers.java:719)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers$$Lambda/0x0000000100c0f440.run(Unknown
>> > > > > Source)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99)
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>> > > > >         at java.lang.Thread.runWith(java.base@21.0.7/Unknown
>> Source)
>> > > > >         at java.lang.Thread.run(java.base@21.0.7/Unknown Source)
>> > > > > ```
>> > > > > The flamegraph and stacktrace I provided are the primary evidence
>> > > > > we've captured for the performance regression.
>> > > > >
>> > > > >
>> > > >
>> > >
>> https://github.com/apache/pulsar/pull/23611/files#diff-f159b4e262ff6213bba19d20e6dc01d07ea8c19f4675524e4ceb1470f456e8fcR229-R231
>> > > > > Here is the change that caused this issue. Each message added to
>> the
>> > > > > DelayedMessageTracker will call `getNumberOfDelayedMessages()`
>> method
>> > > > > which will iterate the map defined in
>> InMemoryDelayedDeliveryTracker
>> > > > >
>> > > > > ```
>> > > > >
>> > > > > protected final
>> > > > Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
>> > > > >         delayedMessageMap = new Long2ObjectAVLTreeMap<>();
>> > > > >
>> > > > > ```
>> > > > >
>> > > > > Regards
>> > > > >
>> > > > > Penghui
>> > > > >
>> > > > >
>> > > > > On Wed, Jun 18, 2025 at 4:01 PM WenZhi Feng <
>> thetumb...@apache.org>
>> > > > wrote:
>> > > > >
>> > > > > > -1 until much clearer proof is shown.
>> > > > > >
>> > > > > > On 2025/06/18 22:26:26 PengHui Li wrote:
>> > > > > > > Hi team,
>> > > > > > >
>> > > > > > > I am writing to propose an urgent revert of PR #23611
>> > > > > > >
>> > > > > > > - https://github.com/apache/pulsar/pull/23611
>> > > > > > >
>> > > > > > > *Reason for Revert:*
>> > > > > > >
>> > > > > > > This PR has introduced a significant performance regression
>> in the
>> > > > Pulsar
>> > > > > > > broker.
>> > > > > > > The attached flame graph visually demonstrates increased CPU
>> > > > utilization
>> > > > > > > and time
>> > > > > > > spent in the code paths related to DelayedDeliveryTracker and
>> > > stream
>> > > > > > > operations.
>> > > > > > >
>> > > > > > > While the intention was to optimize memory usage, the current
>> > > > > > > implementation
>> > > > > > > appears to have an adverse effect on CPU performance, leading
>> to
>> > > > overall
>> > > > > > > degraded
>> > > > > > > broker throughput and increased latency.
>> > > > > > >
>> > > > > > > *Impact:*
>> > > > > > >
>> > > > > > > This regression is impacting the stability and performance of
>> our
>> > > > Pulsar
>> > > > > > > clusters,
>> > > > > > > especially when you have large-scale delayed messages.
>> Reverting
>> > > the
>> > > > > > change
>> > > > > > > will
>> > > > > > > allow us to restore the previous performance characteristics
>> while
>> > > we
>> > > > > > > investigate
>> > > > > > > a more robust and performant solution for
>> DelayedDeliveryTracker
>> > > > memory
>> > > > > > > optimization.
>> > > > > > >
>> > > > > > > *Proposed Action:*
>> > > > > > >
>> > > > > > > I propose to revert PR #23611 as soon as possible to reduce
>> the
>> > > risk
>> > > > to
>> > > > > > > other users,
>> > > > > > > and we can then collectively work on a more thoroughly tested
>> and
>> > > > > > > performant approach
>> > > > > > > to optimize the DelayedDeliveryTracker memory.
>> > > > > > >
>> > > > > > > Please let me know your thoughts, and if there are any
>> immediate
>> > > > concerns
>> > > > > > > with this proposed revert.
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Penghui
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to