Hi!

I must clearly have miscommunicatied or undercommunicated the issue here.
Or you have not read my email fully.

This is the gist: There is a bug in ActiveMQ. ActiveMQ's default queue
handling mechanism is completely broken if you mix persistent and
non-persistent messages on the same queue.

Priorities are a feature of JMS. Persistent and NonPersistent messages are
a feature of JMS. The default StoreQueueCursor handles this wrong.

I have explained why. I have suggested a half-way fix, which for most users
probably would be better than the current situation. I have also suggested
the gist of a complete fix.

I have also suggested a *minuscule *change that would let users like me fix
this themselves. That is: Increase the visibility of one (or preferably
two) classes to the same viz that the other classes in that package have,
so that it is possible to "roll your own" StoreQueueCursor and configure
ActiveMQ to use it.

I do not use, and did not say I use, page and prefetch = 1.

Yes, I can obviously "fix" this by using multiple queues. I will do that.
But I also would want ActiveMQ to not be broken by default.

I am really trying to help here. I can submit code, but it must be with a
feeling that there will be someone that will actually look at it on the
other side.

Thanks,
Endre.


On Mon, Jun 19, 2023 at 4:01 PM Matt Pavlovich <mattr...@gmail.com> wrote:

> Hi Endre-
>
> Contributions are always welcome =).  Sample unit test scenarios and/or
> PRs that demonstrate the issue and an approach to solve for it.
>
> That being said, keep in mind that any system that deals with the big
> three— network I/O, disk I/O & CPU (like messaging and databases) always
> have to accept some level of trade-offs to accommodate different use cases
> and maintain efficiency.
>
> For this specific use case— this feels like one of those scenarios where
> the solution is better served by segmenting the message flow into separate
> queues by priority (and optionally priority+persistence mode). The
> statements about wanting priority and message order maintained are
> conflicting by nature. It suggests that this isn’t one single message flow,
> but "an application publishing several related message flows to one
> destination.”
>
> If the volume of the use case can operate with pageSize=1 and prefetch=1,
> then message volume must be nominal.
>
> Recommendation:
>
> 1. Use a server-side CompositeQueue to fan out the messages by priority
> (and possibly priority + persistence) to separate queues using filter
> pattern (or use Camel, etc)
>
>     ie. queue://mats3.events -> filter and fan out to ->
> queue://mats3.prio.0, queue://mats3.prio.1, queue://mats3.prio.2,
> queue://mats3.prio.3, etc..
>
> 2. Optionally, (if multiple consumers are not feasible) use a client-side
> CompositeQueue (or wildcard queue) to have the application read from all
> the queues at once.
>
>      ie.
> session.createConsumer(session.createQueue(“queue://mats3.prio.*”));
>
> This approach always puts the ’next’ message by priority into a queue and
> the consumers can be separately scaled and managed, You may want a dedicate
> thread for priority = 0, or adjust prefetch, etc.
>
> I suspect if you break up this “one flow” into “several flows” that your
> issues will be resolved and you won’t need to patch or maintain any changes
> to ActiveMQ itself.
>
> Hope this helps!
>
> Thanks,
> Matt Pavlovich
>
> > On Jun 18, 2023, at 6:17 AM, Endre Stølsvik <en...@stolsvik.com> wrote:
> >
> > Hi!
> >
> > tl;dr: It would be great if the
> 'org.apache.activemq.broker.region.cursors.
> > *QueueStorePrefetch*' could have its visibility increased from package to
> > public (take along the corresponding *TopicStorePrefetch* too while at
> > it!). This so that I can make an alternative *StoreQueueCursor*-variant
> > outside of the package, i.e. in my own code. This so that I can get
> around
> > a starvation issue that this class exhibits when mixing persistent and
> > non-persistent messages, which cannot be overcome by priorities.
> >
> > I am researching, and have identified, a problem with the
> StoreQueueCursor.
> > I've found a RedHat issue from 2014 (
> > https://issues.redhat.com/browse/ENTMQ-872) which describes the issue I
> > angled in from (persistent messages are always "preferred" in front of
> > non-persistent messages, even if the latter have high priority) - but it
> > turns out the problem is deeper than that.
> >
> > As described on the message-cursors page (
> > https://activemq.apache.org/message-cursors), in the "paging for
> > non-persistent messages" section and image, the store queue cursor has
> two
> > "buckets" to handle persistent and non-persistent messages.
> >
> > The problem arises from how it handles fetching messages from these two
> > buckets. Basically, it switches between these two buckets *only when the
> > current bucket is (effectively) empty.*
> >
> > This affects ordering *(if you on a producer alternate between persistent
> > and non-persistent messages, they will not be consumed in order, as the
> > "current bucket" will be emptied first)*, and can lead to starvation
> *(the
> > "current bucket" is emptied before switching, so if producers keep up,
> you
> > will never get a message from the 'opposite" bucket)*, and also thus
> > effectively ignores prioritization *(since it doesn't even consider the
> > opposite bucket while the current is non-empty).*
> >
> > My situation is that in the library Mats3 (https://mats3.io/), one often
> > employ "interactive" messages (priority=9) *combined with* non-persistent
> > messaging - on the same queues. This then obviously leads to the
> completely
> > opposite result than the intention: The supposedly "fast, prioritized,
> but
> > not entirely reliable" safe or idempotent GET-style messages/commands
> will
> > be starved if there also are a batch of "ordinary" messages going on
> using
> > the same queues.
> >
> > I have come up with a minimal solution that fixes *my* problem: I need to
> > remove the starvation, and thus the ignoring of prioritization. But this
> > solution will possibly make the dispatch in-order situation worse. What I
> > do, is to change the 'getNextCursor()' method to *always* alternate
> between
> > the buckets if there are messages in both. That is, if there are messages
> > in the opposite bucket, then switch. This fixes much - *and is probably
> > better for most users, *without any discernible side effects.
> >
> > If the QueueStorePrefetch class was public, as all other classes in this
> > package except the corresponding TopicStorePrefetch is, then I could have
> > made an alternate implementation in my own code. I have actually
> > successfully made an extension of the class now by using reflection on
> the
> > private fields and overriding the 'getNextCursor()' method (which luckily
> > is protected), but this is obviously not ideal.
> >
> > More detailed:
> >
> > The problem is the combination of these three methods:
> >
> > @Override
> > public synchronized boolean hasNext() {
> >    try {
> >        getNextCursor();
> >    } catch (Exception e) {
> >        LOG.error("Failed to get current cursor ", e);
> >        throw new RuntimeException(e);
> >   }
> >   return currentCursor != null ? currentCursor.hasNext() : false;
> > }
> >
> > @Override
> > public synchronized MessageReference next() {
> >    MessageReference result = currentCursor != null ?
> > currentCursor.next() : null;
> >    return result;
> > }
> >
> >
> > protected synchronized PendingMessageCursor getNextCursor() throws
> Exception {
> >    if (currentCursor == null ||
> > !currentCursor.hasMessagesBufferedToDeliver()) {
> >        currentCursor = currentCursor == persistent ? nonPersistent :
> > persistent;
> >        // sanity check
> >        if (currentCursor.isEmpty()) {
> >            currentCursor = currentCursor == persistent ?
> > nonPersistent : persistent;
> >        }
> >    }
> >    return currentCursor;
> > }
> >
> >
> > If I change the getNextCursor to this, most things gets better:
> >
> > protected synchronized PendingMessageCursor getNextCursor() throws
> Exception {
> >    // ?: Sanity check that nonPersistent has been set, i.e. that
> > start() has been invoked.
> >    if (nonPersistent == null) {
> >        // -> No, not set, so don't switch currentCursor to it - so
> > that currentCursor never becomes null.
> >        return currentCursor;
> >    }
> >
> >    // Get opposite cursor:
> >    PendingMessageCursor oppositeCursor = currentCursor == persistent
> > ? nonPersistent : persistent;
> >    // ?: Do we have any messages in the opposite?
> >    if (oppositeCursor.hasNext()) {
> >        // -> Yes, so do the switch
> >        currentCursor = oppositeCursor;
> >    }
> >    return currentCursor;
> > }
> >
> > .. but with this, producing a bunch of persistent messages, and then
> > non-persistent, will lead to them being fetched alternating (even though
> > you wanted all the persistent first, then non-persistent). Then again, if
> > you did the opposite - produced a bunch of non-persistent, then
> persistent
> > - the current solution will first dequeue all the persistent. So, it's
> bad
> > anyhow.
> >
> > (As an aside, not for now: IMHO the defensive null-checking prevalent in
> > this class should also be removed.)
> >
> > Note: A complete solution would require a "peek" functionality to see
> which
> > bucket had the highest priority message, and identify a way to find the
> > order between two messages when their priority was equal. You'd then
> always
> > switch to the bucket with the "true next" message.
> >
> > *Note that to easily experience this, you should set both the maxPageSize
> > and client prefetch to 1. *Otherwise, it seems like several of these
> issues
> > are *masked* by either the layer above, or on the client - i.e. it
> > reorders, and takes into consideration the prioritization. However, when
> > you produce thousands of messages, the page size of 200 and prefetch of
> > 1000 cannot mask it anymore, and the problem shows itself (in production,
> > obviously!). But it is harder to observe, and reason about, such large
> > amounts of messages, thus setting these values to 1 gives you the full
> > experience right away.
> >
> > I do have some code and tests for this if interesting - but for now, it
> > would be really nice if this one rather non-intrusive change was
> > implemented ASAP, maybe even before 5.18.2? - so that I can implement a
> > better fix on our side than heavy-handed reflection of private fields:
> That
> > is, I humbly request that the classes QueueStorePrefetch (and
> corresponding
> > TopicStorePrefetch) are made public, so that one can make alternative
> > implementations outside of the ActiveMQ code.
> >
> > PS: It seems like the corresponding topic side of this,
> > StoreDurableSubscriberCursor with TopicStorePrefetch, have had some work
> > done for it in 2010 for probably the same type of issue, adding a
> > "immediatePriorityDispatch" flag and corresponding functionality:
> *"ensure
> > new high priority messages get dispatched immediately rather than at the
> > end of the next batch, configurable via
> > PendingDurableSubscriberMessageStoragePolicy.immediatePriorityDispatch
> > default true, most relevant with prefetch=1"*. I don't fully understand
> > this solution, but can't shake the feeling that it is a literal patch
> > instead of handling the underlying problem: Dequeuing from two underlying
> > queues ("buckets") must take into account the head of both, finding the
> > "true next" wrt. order and priority.
> >
> > Thanks a bunch,
> > Kind regards,
> > Endre Stølsvik.
>
>

Reply via email to