[ 
https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13188421#comment-13188421
 ] 

Matthias Wessel edited comment on AMQ-3664 at 1/18/12 12:15 PM:
----------------------------------------------------------------

When I switch to another cache than CACHE_AUTO or CACHE CONSUMER, I get a 
dramatic deterioration of performance. Then I am faster when I do not use 
ptimizeAcknowledge = true. This cannot be the goal of this setting!
The other problem is I do not want to close the consumer. Yes, I want to close 
the producer, but the consumer should consume all existing messages in the 
queue. The consumer should not care about how many producers produce messages 
to the queue.
                
      was (Author: matw):
    When I switch to another cache than CACHE_AUTO or CACHE CONSUMER, I get a 
dramatic deterioration of performance. Then I am faster when I do not use 
ptimizeAcknowledge = true. This cannot be the goal of this setting!
                  
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true 
> I get a dramatic performance improvement, but when I shut down the producer 
> the consumer does not acknowledge all messages! If I stop the consumer and 
> then I start the consumer a second time the consumer recieves messages again 
> and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
>     <camel:camelContext id="camelContext">
>       <camel:template id="producerTemplate"/>
>       <camel:consumerTemplate id="consumerTemplate"/>
>       <camel:route>
>               <camel:from 
> uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>               <camel:to uri="beanConsumer"/>
>       </camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" 
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>               <property name="connectionFactory">             
>                       <bean 
> class="org.apache.activemq.pool.PooledConnectionFactory">
>                               <property name="connectionFactory">
>                                       <bean 
> class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>                                               <property 
> name="optimizeAcknowledge" value="true"/>
>                                               <property name="dispatchAsync" 
> value="true"/>
>                                               <property name="sendAcksAsync" 
> value="true"/>
>                                               <property name="useAsyncSend" 
> value="true"/>
>                                               <property name="brokerURL" 
> value="nio://138-ham-de:61616"/>                                             
>                                               <property 
> name="useDedicatedTaskRunner" value="false"/> 
>                               </bean> 
>                               </property>
>                       </bean>
>               </property>
>     </bean>
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
>     private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * 
> .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + 
> optimizeAckTimeout)) {
>                                       MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                       if (ack != null) {
>                                           deliveredMessages.clear();
>                                           ackCounter = 0;
>                                           session.sendAck(ack);
>                                           optimizeAckTimestamp = 
> System.currentTimeMillis();
>                                       }
>                                 }
>                             } else {
>                                 MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if 
> (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> What will happen when no producer will send a message to this queue so that 
> no message will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to