[ 
https://issues.apache.org/jira/browse/AMQ-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13458647#comment-13458647
 ] 

Claus Ibsen commented on AMQ-3539:
----------------------------------

Martin, thanks for reporting and with the details you do.
Have you been running with your "patch" in production for a while? And did that 
fix it for you?


                
> Prefetch state can be incorrect when transacted redelivery of duplicates 
> occurs, causing stalled queue
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3539
>                 URL: https://issues.apache.org/jira/browse/AMQ-3539
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, JMS client
>    Affects Versions: 5.5.0
>            Reporter: Martin Serrano
>            Priority: Critical
>
> * In ActiveMQMessageConsumer, delivery acks are generated by receive() calls 
> and by the dispatch() method when transacted redelivery of duplicates occurs. 
>  These delivery acks are consolidated by calling ackLater which batches them 
> up using first/last message id and sends the acks as appropriate w.r.t. the 
> prefetch size.
> * On the broker, the prefetch window is extended by checking the last message 
> id, finding where it is in the dispatched queue and incrementing the 
> prefetchExtension accordingly.  This algorithm depends on the consumer 
> maintaining the dispatch order in its consolidated delivery acks.
> * When the transacted redelivery occurs, it happens in a separate thread than 
> the receive, operating on the latest delivered message.  The delivery acks 
> from the receive thread are arbitrarily delayed (but in order of dispatch) 
> depending on client action.  The mixing of these can result in an out of 
> order consolidated delivery ack.
> Real example (the client and broker logs are mixed to make it easier to 
> follow; the dispatch logs come from my own custom logging plugin):
> {noformat}
> 2011-10-12 11:51:51,712 TRACE ActiveMqBroker [BrokerService[jmsBroker] 
> Task-3] - Dispatching message [ID:XXX-45585-1318434687322-0:68:1:1:1]
> 2011-10-12 11:51:51,834 TRACE ActiveMqBroker [BrokerService[jmsBroker] 
> Task-43] - Dispatching message [ID:XXX-60241-1318434687532-0:56:1:1:3]
> 2011-10-12 11:51:51,835 TRACE ActiveMqBroker [BrokerService[jmsBroker] 
> Task-43] - Dispatching message [ID:XXX-45585-1318434687322-0:68:1:1:2]
> 2011-10-12 11:51:51,836 DA [ActiveMQ Session Task-1] - 
> ID:XXX-60241-1318434687532-0:33:1:1 tracking transacted redelivery of 
> duplicate: ActiveMQTextMessage {commandId = 827, responseRequired = true, 
> messageId = ID:XXX-45585-1318434687322-0:68:1:1:2, originalDestination = 
> null, originalTransactionId = null, producerId = 
> ID:XXX-45585-1318434687322-0:68:1:1, destination = 
> queue://indexer.index-content-dispatcher, transactionId = null, expiration = 
> 0, timestamp = 1318434696398, arrival = 0, brokerInTime = 1318434696461, 
> brokerOutTime = 1318434711835, correlationId = null, replyTo = null, 
> persistent = true, type = null, priority = 4, groupID = null, groupSequence = 
> 0, targetConsumerId = null, compressed = false, userID = null, content = 
> null, marshalledProperties = org.apache.activemq.util.ByteSequence@305e3ad0, 
> dataStructure = null, redeliveryCounter = 0, size = 0, properties = XXX }
> 2011-10-12 11:51:51,876 TRACE ActiveMqBroker [ActiveMQ Transport: 
> tcp:///127.0.0.1:55235] - firstId: ID:XXX-45585-1318434687322-0:68:1:1:2, 
> lastId: ID:XXX-45585-1318434687322-0:68:1:1:1, dest: 
> queue://indexer.index-content-dispatcher, acktype:0, individualAck:false, 
> deliveredAck:true, messageAck:true, standardAck:false
> 2011-10-12 11:51:51,878 TRACE ActiveMqBroker [BrokerService[jmsBroker] 
> Task-8] - Dispatching message [ID:XXX-60241-1318434687532-0:56:1:1:4]
> 2011-10-12 11:51:51,879 DA [ActiveMQ Session Task-1] - 
> ID:XXX-60241-1318434687532-0:33:1:1 tracking transacted redelivery of 
> duplicate: ActiveMQTextMessage {commandId = 840, responseRequired = true, 
> messageId = ID:XXX-60241-1318434687532-0:56:1:1:4, originalDestination = 
> null, originalTransactionId = null, producerId = 
> ID:XXX-60241-1318434687532-0:56:1:1, destination = 
> queue://indexer.index-content-dispatcher, transactionId = null, expiration = 
> 0, timestamp = 1318434696518, arrival = 0, brokerInTime = 1318434696600, 
> brokerOutTime = 1318434711878, correlationId = null, replyTo = null, 
> persistent = true, type = null, priority = 4, groupID = null, groupSequence = 
> 0, targetConsumerId = null, compressed = false, userID = null, content = 
> null, marshalledProperties = org.apache.activemq.util.ByteSequence@332a4674, 
> dataStructure = null, redeliveryCounter = 0, size = 0, properties = XXX }
> 2011-10-12 11:51:51,903 TRACE ActiveMqBroker [ActiveMQ Transport: 
> tcp:///127.0.0.1:55235] - firstId: ID:XXX-60241-1318434687532-0:56:1:1:4, 
> lastId: ID:XXX-60241-1318434687532-0:56:1:1:3, dest: 
> queue://indexer.index-content-dispatcher, acktype:0, individualAck:false, 
> deliveredAck:true, messageAck:true, standardAck:false
> 2011-10-12 11:51:51,905 TRACE ActiveMqBroker [BrokerService[jmsBroker] 
> Task-8] - Dispatching message [ID:XXX-45585-1318434687322-0:68:1:1:3]
> {noformat}
> In the first ack received by the broker, you can see that message 
> {{68:1:1:2}} is the first id and {{68:1:1:1}} is the last id.  The broker 
> never looks at the first id and will consider this a delivery ack of 
> everything up to {{68:1:1:1}} (which was the first message dispatched).  Thus 
> this mixing results in a incorrect delivery count on the part of the broker.
> An easy fix which would sometimes prematurely extend the prefetch window 
> would be to always send transacted redelivery acks immediately (or 
> consolidate them separately from receive originated delivery acks).  Since 
> transacted redelivery acks always get triggered on messages delivered later 
> than the receive acks this would cause the broker to think that all the 
> earlier messages had been delivered also.  This might be inappropriate with 
> really large prefetch sizes, although this is tempered by the fact that this 
> only occurs in failover situations.  
> Another fix might be to enqueue the transacted redelivered messages and do 
> the filtering of these types of messages in the dequeue method which would 
> result in proper ordering of the delivery acks.
> Anything else would seem to require explicit broker accounting of each 
> delivered message.  I'm willing to try to implement one of these fixes (I'm 
> leaning to the dequeue filtering) but would like some guidance.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to