[ 
https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13453819#comment-13453819
 ] 

Timothy Bish commented on AMQ-3664:
-----------------------------------

The simplest thing would probably be to add a task to the Schedular instance 
that's owned by ActiveMQConnection that can check for time since last ack on 
the consumer and fire an ack if the configured time has elapsed.
                
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true 
> I get a dramatic performance improvement, but when I shut down the producer 
> the consumer does not acknowledge all messages! If I stop the consumer and 
> then I start the consumer a second time the consumer recieves messages again 
> and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
> {noformat}
>     <camel:camelContext id="camelContext">
>       <camel:template id="producerTemplate"/>
>       <camel:consumerTemplate id="consumerTemplate"/>
>       <camel:route>
>               <camel:from 
> uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>               <camel:to uri="beanConsumer"/>
>       </camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" 
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>               <property name="connectionFactory">             
>                       <bean 
> class="org.apache.activemq.pool.PooledConnectionFactory">
>                               <property name="connectionFactory">
>                                       <bean 
> class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>                                               <property 
> name="optimizeAcknowledge" value="true"/>
>                                               <property name="dispatchAsync" 
> value="true"/>
>                                               <property name="sendAcksAsync" 
> value="true"/>
>                                               <property name="useAsyncSend" 
> value="true"/>
>                                               <property name="brokerURL" 
> value="nio://138-ham-de:61616"/>                                             
>                                               <property 
> name="useDedicatedTaskRunner" value="false"/> 
>                               </bean> 
>                               </property>
>                       </bean>
>               </property>
>     </bean>
> {noformat}
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
> {noformat}
>     private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * 
> .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + 
> optimizeAckTimeout)) {
>                                       MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                       if (ack != null) {
>                                           deliveredMessages.clear();
>                                           ackCounter = 0;
>                                           session.sendAck(ack);
>                                           optimizeAckTimestamp = 
> System.currentTimeMillis();
>                                       }
>                                 }
>                             } else {
>                                 MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if 
> (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> {noformat}
> What will happen when no producer will send a message to this queue so that 
> no message will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to