[ 
https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13188430#comment-13188430
 ] 

Gary Tully commented on AMQ-3664:
---------------------------------

How do you stop the consumer? When the consumer is stopped it needs to be 
shutdown, so if it is cached, it needs to be really closed, because 
optimizeAcknowledge delays acks (to batch them) for a period or till half of 
the prefetch is reached or till it is closed. The expectation is that there 
will be duplicate delivery if the consumer is not closed.
The default auto ack mode, does an immediate ack.

The other alternative is to use transactions to batch your client acks, I think 
this may be the simplest approach. 
Another option is to use client ack mode and periodically call acknowledge, but 
with camel, this would require your own processor to get access to the 
underlying activemq message to call acknowledge.

I think first, validate that the consumer is closed when it is shutdown.

For this case, we may need to introduce a consumer task that does a periodic 
ack, to catch the case where the consumer is cached, there is a pause in 
production, and there are pending acks. But in the abortive case, there may 
still be duplicates.
                
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true 
> I get a dramatic performance improvement, but when I shut down the producer 
> the consumer does not acknowledge all messages! If I stop the consumer and 
> then I start the consumer a second time the consumer recieves messages again 
> and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
>     <camel:camelContext id="camelContext">
>       <camel:template id="producerTemplate"/>
>       <camel:consumerTemplate id="consumerTemplate"/>
>       <camel:route>
>               <camel:from 
> uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>               <camel:to uri="beanConsumer"/>
>       </camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" 
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>               <property name="connectionFactory">             
>                       <bean 
> class="org.apache.activemq.pool.PooledConnectionFactory">
>                               <property name="connectionFactory">
>                                       <bean 
> class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>                                               <property 
> name="optimizeAcknowledge" value="true"/>
>                                               <property name="dispatchAsync" 
> value="true"/>
>                                               <property name="sendAcksAsync" 
> value="true"/>
>                                               <property name="useAsyncSend" 
> value="true"/>
>                                               <property name="brokerURL" 
> value="nio://138-ham-de:61616"/>                                             
>                                               <property 
> name="useDedicatedTaskRunner" value="false"/> 
>                               </bean> 
>                               </property>
>                       </bean>
>               </property>
>     </bean>
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
>     private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * 
> .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + 
> optimizeAckTimeout)) {
>                                       MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                       if (ack != null) {
>                                           deliveredMessages.clear();
>                                           ackCounter = 0;
>                                           session.sendAck(ack);
>                                           optimizeAckTimestamp = 
> System.currentTimeMillis();
>                                       }
>                                 }
>                             } else {
>                                 MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if 
> (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> What will happen when no producer will send a message to this queue so that 
> no message will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to