[ 
https://issues.apache.org/jira/browse/AMQ-6658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16026716#comment-16026716
 ] 

Fabian González commented on AMQ-6658:
--------------------------------------

I think I found what causes the issue concerning the wrong order after 
redelivery.

There are two active threads which are involved in an apparent race condition:

ActiveMQConnection[xx]Scheduler
ActiveMQ Session Task

1) A Session Task thread verifies in ActiveMQSessionExecutor.iterate that there 
are messages queued on the session (as there are 60 messages prefetched).
2) It dispatchs the threads through ActiveMQMessageConsumer
3) if unconsumedMessages is not running in ActiveMQMessageConsumer.dispatch, it 
enqueues the message in unconsumedMessages and it is not redelivered. 

So if ActiveMQ Session Task polls messages several times before 
ActiveMQConnection[xx]Scheduler starts unconsumedMessages in 
ActiveMQMessageConsumer:1889 (unconsumedMessages.start();), there would be 
several prefetched unconsumed messages.

The problem arises if ActiveMQ Session Task polls messages several times, it 
polls a new message, and then  ActiveMQConnection[xx]Scheduler. In that case 
unconsumedMessages is running, so the message is delivered (but the first 
message which was enqueued in unconsumedMessage was the one which should have 
been delivered first).


I think the dispatch message has to deliver always the first mesage oldest 
message which was enqueued in unconsumedMessages.

For example in the dispatch method in ActiveMQMessageConsumer, something like 
this should be done:

    @Override
    public void dispatch(MessageDispatch md)
    {
        MessageListener listener = this.messageListener.get();
        try
        {
            clearMessagesInProgress();
            clearDeliveredList();
            synchronized (unconsumedMessages.getMutex())
            {
                if (!unconsumedMessages.isClosed())
                {
                    if (this.info.isBrowser() || 
!session.connection.isDuplicate(this, md.getMessage()))
                    {
                        if (listener != null && unconsumedMessages.isRunning())
                        {
                            // otherwise I do not preserve the order for
                            // redelivery
                            unconsumedMessages.enqueue(md);
                            md = unconsumedMessages.dequeueNoWait();

 

> Messenger does not respect order after redelivery
> -------------------------------------------------
>
>                 Key: AMQ-6658
>                 URL: https://issues.apache.org/jira/browse/AMQ-6658
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.14.3
>            Reporter: Fabian González
>
> I am facing a situation  where activemq seems not to respect the order for 
> dispatched messages when a redilevery is needed using activemq-client 5.14.3.
> I am sending 60 messages to a queue with a single consumer, and I've noticed 
> that sometimes when a relivery of the message is needed, as a consequence of 
> rollback, another message from those 60 message is served before the 
> redelivered message. There is no maxRedelivery set.
> What I notice debugging ActiveMQMessageConsumer is that the following 
> behaviour may occur:
> - The 60 messages are dispatched in order in:
> ActiveMQMessageConsumer:1376:
>     @Override
>     public void dispatch(MessageDispatch md) {
>         MessageListener listener = this.messageListener.get();
>         try {
>             clearMessagesInProgress();
>             ...
> unconsumedMessage is running so the message is sent to the listener.
> - a rollback is performed and the message is redelivered (with a default 
> delay):
> ActiveMQMessageConsumer:1305:
>                         if (redeliveryDelay > 0 && 
> !unconsumedMessages.isClosed()) {
>                             // Start up the delivery again a little later.
>                             session.getScheduler().executeAfterDelay(new 
> Runnable() {
>                                 @Override
>                                 public void run() {
>                                     try {
>                                         if (started.get()) {
>                                             start();
>                                         }
>                                     } catch (JMSException e) {
>                                         
> session.connection.onAsyncException(e);
>                                     }
>                                 }
>                             }, redeliveryDelay);
>                         } else {
>                             start();
>                         }
> Periodically, the messages enqueued in the session are attempted to be 
> consumed (as the unconsumedMessages from the consumer is not running they are 
> not sent to the listener to be consumed and they are enqueued as 
> unconsumedMessages).
> But if the thread scheduled from redelivery is started when the iteration 
> from the unconsumed messages is being performed, the unconsumedMessages is 
> started in:
>     public void start() throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         started.set(true);
>         unconsumedMessages.start();
>         session.executor.wakeup();
>     }
> and the message that is being considered from session (in the other thread) 
> is sent to the listener before the redelivered message, which may be an error 
> in order.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to