[ 
https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13457723#comment-13457723
 ] 

Timothy Bish commented on AMQ-3664:
-----------------------------------

added option optimizedAckScheduledAckInterval to Connection and MessageConsumer 
to allow for a configurable async ack of outstanding messages.  By default this 
option is set to zero, meaning no async acks are sent.  If the user wants to 
ensure all messages are acked at some point then they can enable this by 
setting to a value greater than zero.  
                
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>             Fix For: 5.7.0
>
>
> I make performance test with activemq. When I set optimizeAcknowledge = true 
> I get a dramatic performance improvement, but when I shut down the producer 
> the consumer does not acknowledge all messages! If I stop the consumer and 
> then I start the consumer a second time the consumer recieves messages again 
> and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
> {noformat}
>     <camel:camelContext id="camelContext">
>       <camel:template id="producerTemplate"/>
>       <camel:consumerTemplate id="consumerTemplate"/>
>       <camel:route>
>               <camel:from 
> uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>               <camel:to uri="beanConsumer"/>
>       </camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" 
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>               <property name="connectionFactory">             
>                       <bean 
> class="org.apache.activemq.pool.PooledConnectionFactory">
>                               <property name="connectionFactory">
>                                       <bean 
> class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>                                               <property 
> name="optimizeAcknowledge" value="true"/>
>                                               <property name="dispatchAsync" 
> value="true"/>
>                                               <property name="sendAcksAsync" 
> value="true"/>
>                                               <property name="useAsyncSend" 
> value="true"/>
>                                               <property name="brokerURL" 
> value="nio://138-ham-de:61616"/>                                             
>                                               <property 
> name="useDedicatedTaskRunner" value="false"/> 
>                               </bean> 
>                               </property>
>                       </bean>
>               </property>
>     </bean>
> {noformat}
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
> {noformat}
>     private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * 
> .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + 
> optimizeAckTimeout)) {
>                                       MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                       if (ack != null) {
>                                           deliveredMessages.clear();
>                                           ackCounter = 0;
>                                           session.sendAck(ack);
>                                           optimizeAckTimestamp = 
> System.currentTimeMillis();
>                                       }
>                                 }
>                             } else {
>                                 MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if 
> (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> {noformat}
> What will happen when no producer will send a message to this queue so that 
> no message will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to