Hi Endre-

Contributions are always welcome =).  Sample unit test scenarios and/or PRs 
that demonstrate the issue and an approach to solve for it.

That being said, keep in mind that any system that deals with the big three— 
network I/O, disk I/O & CPU (like messaging and databases) always have to 
accept some level of trade-offs to accommodate different use cases and maintain 
efficiency.

For this specific use case— this feels like one of those scenarios where the 
solution is better served by segmenting the message flow into separate queues 
by priority (and optionally priority+persistence mode). The statements about 
wanting priority and message order maintained are conflicting by nature. It 
suggests that this isn’t one single message flow, but "an application 
publishing several related message flows to one destination.”

If the volume of the use case can operate with pageSize=1 and prefetch=1, then 
message volume must be nominal. 

Recommendation:

1. Use a server-side CompositeQueue to fan out the messages by priority (and 
possibly priority + persistence) to separate queues using filter pattern (or 
use Camel, etc)

    ie. queue://mats3.events -> filter and fan out to -> queue://mats3.prio.0, 
queue://mats3.prio.1, queue://mats3.prio.2, queue://mats3.prio.3, etc..  
 
2. Optionally, (if multiple consumers are not feasible) use a client-side 
CompositeQueue (or wildcard queue) to have the application read from all the 
queues at once.

     ie. session.createConsumer(session.createQueue(“queue://mats3.prio.*”));

This approach always puts the ’next’ message by priority into a queue and the 
consumers can be separately scaled and managed, You may want a dedicate thread 
for priority = 0, or adjust prefetch, etc.

I suspect if you break up this “one flow” into “several flows” that your issues 
will be resolved and you won’t need to patch or maintain any changes to 
ActiveMQ itself.

Hope this helps!

Thanks,
Matt Pavlovich

> On Jun 18, 2023, at 6:17 AM, Endre Stølsvik <en...@stolsvik.com> wrote:
> 
> Hi!
> 
> tl;dr: It would be great if the 'org.apache.activemq.broker.region.cursors.
> *QueueStorePrefetch*' could have its visibility increased from package to
> public (take along the corresponding *TopicStorePrefetch* too while at
> it!). This so that I can make an alternative *StoreQueueCursor*-variant
> outside of the package, i.e. in my own code. This so that I can get around
> a starvation issue that this class exhibits when mixing persistent and
> non-persistent messages, which cannot be overcome by priorities.
> 
> I am researching, and have identified, a problem with the StoreQueueCursor.
> I've found a RedHat issue from 2014 (
> https://issues.redhat.com/browse/ENTMQ-872) which describes the issue I
> angled in from (persistent messages are always "preferred" in front of
> non-persistent messages, even if the latter have high priority) - but it
> turns out the problem is deeper than that.
> 
> As described on the message-cursors page (
> https://activemq.apache.org/message-cursors), in the "paging for
> non-persistent messages" section and image, the store queue cursor has two
> "buckets" to handle persistent and non-persistent messages.
> 
> The problem arises from how it handles fetching messages from these two
> buckets. Basically, it switches between these two buckets *only when the
> current bucket is (effectively) empty.*
> 
> This affects ordering *(if you on a producer alternate between persistent
> and non-persistent messages, they will not be consumed in order, as the
> "current bucket" will be emptied first)*, and can lead to starvation *(the
> "current bucket" is emptied before switching, so if producers keep up, you
> will never get a message from the 'opposite" bucket)*, and also thus
> effectively ignores prioritization *(since it doesn't even consider the
> opposite bucket while the current is non-empty).*
> 
> My situation is that in the library Mats3 (https://mats3.io/), one often
> employ "interactive" messages (priority=9) *combined with* non-persistent
> messaging - on the same queues. This then obviously leads to the completely
> opposite result than the intention: The supposedly "fast, prioritized, but
> not entirely reliable" safe or idempotent GET-style messages/commands will
> be starved if there also are a batch of "ordinary" messages going on using
> the same queues.
> 
> I have come up with a minimal solution that fixes *my* problem: I need to
> remove the starvation, and thus the ignoring of prioritization. But this
> solution will possibly make the dispatch in-order situation worse. What I
> do, is to change the 'getNextCursor()' method to *always* alternate between
> the buckets if there are messages in both. That is, if there are messages
> in the opposite bucket, then switch. This fixes much - *and is probably
> better for most users, *without any discernible side effects.
> 
> If the QueueStorePrefetch class was public, as all other classes in this
> package except the corresponding TopicStorePrefetch is, then I could have
> made an alternate implementation in my own code. I have actually
> successfully made an extension of the class now by using reflection on the
> private fields and overriding the 'getNextCursor()' method (which luckily
> is protected), but this is obviously not ideal.
> 
> More detailed:
> 
> The problem is the combination of these three methods:
> 
> @Override
> public synchronized boolean hasNext() {
>    try {
>        getNextCursor();
>    } catch (Exception e) {
>        LOG.error("Failed to get current cursor ", e);
>        throw new RuntimeException(e);
>   }
>   return currentCursor != null ? currentCursor.hasNext() : false;
> }
> 
> @Override
> public synchronized MessageReference next() {
>    MessageReference result = currentCursor != null ?
> currentCursor.next() : null;
>    return result;
> }
> 
> 
> protected synchronized PendingMessageCursor getNextCursor() throws Exception {
>    if (currentCursor == null ||
> !currentCursor.hasMessagesBufferedToDeliver()) {
>        currentCursor = currentCursor == persistent ? nonPersistent :
> persistent;
>        // sanity check
>        if (currentCursor.isEmpty()) {
>            currentCursor = currentCursor == persistent ?
> nonPersistent : persistent;
>        }
>    }
>    return currentCursor;
> }
> 
> 
> If I change the getNextCursor to this, most things gets better:
> 
> protected synchronized PendingMessageCursor getNextCursor() throws Exception {
>    // ?: Sanity check that nonPersistent has been set, i.e. that
> start() has been invoked.
>    if (nonPersistent == null) {
>        // -> No, not set, so don't switch currentCursor to it - so
> that currentCursor never becomes null.
>        return currentCursor;
>    }
> 
>    // Get opposite cursor:
>    PendingMessageCursor oppositeCursor = currentCursor == persistent
> ? nonPersistent : persistent;
>    // ?: Do we have any messages in the opposite?
>    if (oppositeCursor.hasNext()) {
>        // -> Yes, so do the switch
>        currentCursor = oppositeCursor;
>    }
>    return currentCursor;
> }
> 
> .. but with this, producing a bunch of persistent messages, and then
> non-persistent, will lead to them being fetched alternating (even though
> you wanted all the persistent first, then non-persistent). Then again, if
> you did the opposite - produced a bunch of non-persistent, then persistent
> - the current solution will first dequeue all the persistent. So, it's bad
> anyhow.
> 
> (As an aside, not for now: IMHO the defensive null-checking prevalent in
> this class should also be removed.)
> 
> Note: A complete solution would require a "peek" functionality to see which
> bucket had the highest priority message, and identify a way to find the
> order between two messages when their priority was equal. You'd then always
> switch to the bucket with the "true next" message.
> 
> *Note that to easily experience this, you should set both the maxPageSize
> and client prefetch to 1. *Otherwise, it seems like several of these issues
> are *masked* by either the layer above, or on the client - i.e. it
> reorders, and takes into consideration the prioritization. However, when
> you produce thousands of messages, the page size of 200 and prefetch of
> 1000 cannot mask it anymore, and the problem shows itself (in production,
> obviously!). But it is harder to observe, and reason about, such large
> amounts of messages, thus setting these values to 1 gives you the full
> experience right away.
> 
> I do have some code and tests for this if interesting - but for now, it
> would be really nice if this one rather non-intrusive change was
> implemented ASAP, maybe even before 5.18.2? - so that I can implement a
> better fix on our side than heavy-handed reflection of private fields: That
> is, I humbly request that the classes QueueStorePrefetch (and corresponding
> TopicStorePrefetch) are made public, so that one can make alternative
> implementations outside of the ActiveMQ code.
> 
> PS: It seems like the corresponding topic side of this,
> StoreDurableSubscriberCursor with TopicStorePrefetch, have had some work
> done for it in 2010 for probably the same type of issue, adding a
> "immediatePriorityDispatch" flag and corresponding functionality: *"ensure
> new high priority messages get dispatched immediately rather than at the
> end of the next batch, configurable via
> PendingDurableSubscriberMessageStoragePolicy.immediatePriorityDispatch
> default true, most relevant with prefetch=1"*. I don't fully understand
> this solution, but can't shake the feeling that it is a literal patch
> instead of handling the underlying problem: Dequeuing from two underlying
> queues ("buckets") must take into account the head of both, finding the
> "true next" wrt. order and priority.
> 
> Thanks a bunch,
> Kind regards,
> Endre Stølsvik.

Reply via email to