[ 
https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timothy Bish updated AMQ-3664:
------------------------------

    Description: 
I make performance test with activemq. When I set optimizeAcknowledge = true I 
get a dramatic performance improvement, but when I shut down the producer the 
consumer does not acknowledge all messages! If I stop the consumer and then I 
start the consumer a second time the consumer recieves messages again and again 
not all messages will be acknoledged in the queue.

I am using camel 2.9.0 to produce and consume the messages.
I am using the consumer Template with asyncSendBody.
The following route is configured in the camelContext:

{noformat}

    <camel:camelContext id="camelContext">
        <camel:template id="producerTemplate"/>
        <camel:consumerTemplate id="consumerTemplate"/>
        <camel:route>
                <camel:from 
uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
                <camel:to uri="beanConsumer"/>
        </camel:route>
    </camel:camelContext>

The config for the ActiveMQComponent:
    <bean id="jms" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
                <property name="connectionFactory">             
                        <bean 
class="org.apache.activemq.pool.PooledConnectionFactory">
                                <property name="connectionFactory">
                                        <bean 
class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                                                <property 
name="optimizeAcknowledge" value="true"/>
                                                <property name="dispatchAsync" 
value="true"/>
                                                <property name="sendAcksAsync" 
value="true"/>
                                                <property name="useAsyncSend" 
value="true"/>
                                                <property name="brokerURL" 
value="nio://138-ham-de:61616"/>                                             
                                                <property 
name="useDedicatedTaskRunner" value="false"/> 
                                </bean> 
                        </property>
                </bean>
        </property>
    </bean>

{noformat}

I think, the problem is here:
Class ActiveMQMessageConsumer:

{noformat}

    private void afterMessageIsConsumed(MessageDispatch md, boolean 
messageExpired) throws JMSException {
        if (unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            synchronized (deliveredMessages) {
                deliveredMessages.remove(md);
            }
            stats.getExpiredMessageCount().increment();
            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;
                                if (ackCounter >= (info.getPrefetchSize() * 
.65) || System.currentTimeMillis() >= (optimizeAckTimestamp + 
optimizeAckTimeout)) {
                                        MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                        if (ack != null) {
                                            deliveredMessages.clear();
                                            ackCounter = 0;
                                            session.sendAck(ack);
                                            optimizeAckTimestamp = 
System.currentTimeMillis();
                                        }
                                }
                            } else {
                                MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack!=null) {
                                    deliveredMessages.clear();
                                    session.sendAck(ack);
                                }
                            }
                        }
                    }
                    deliveryingAcknowledgements.set(false);
                }
            } else if (isAutoAcknowledgeBatch()) {
                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
            } else if 
(session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                boolean messageUnackedByConsumer = false;
                synchronized (deliveredMessages) {
                    messageUnackedByConsumer = deliveredMessages.contains(md);
                }
                if (messageUnackedByConsumer) {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            } 
            else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
    }

{noformat}

What will happen when no producer will send a message to this queue so that no 
message will pass this method? When will the deliveredMessages been acked?




  was:
I make performance test with activemq. When I set optimizeAcknowledge = true I 
get a dramatic performance improvement, but when I shut down the producer the 
consumer does not acknowledge all messages! If I stop the consumer and then I 
start the consumer a second time the consumer recieves messages again and again 
not all messages will be acknoledged in the queue.

I am using camel 2.9.0 to produce and consume the messages.
I am using the consumer Template with asyncSendBody.
The following route is configured in the camelContext:

    <camel:camelContext id="camelContext">
        <camel:template id="producerTemplate"/>
        <camel:consumerTemplate id="consumerTemplate"/>
        <camel:route>
                <camel:from 
uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
                <camel:to uri="beanConsumer"/>
        </camel:route>
    </camel:camelContext>

The config for the ActiveMQComponent:
    <bean id="jms" 
class="org.apache.activemq.camel.component.ActiveMQComponent">
                <property name="connectionFactory">             
                        <bean 
class="org.apache.activemq.pool.PooledConnectionFactory">
                                <property name="connectionFactory">
                                        <bean 
class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                                                <property 
name="optimizeAcknowledge" value="true"/>
                                                <property name="dispatchAsync" 
value="true"/>
                                                <property name="sendAcksAsync" 
value="true"/>
                                                <property name="useAsyncSend" 
value="true"/>
                                                <property name="brokerURL" 
value="nio://138-ham-de:61616"/>                                             
                                                <property 
name="useDedicatedTaskRunner" value="false"/> 
                                </bean> 
                        </property>
                </bean>
        </property>
    </bean>

I think, the problem is here:
Class ActiveMQMessageConsumer:
    private void afterMessageIsConsumed(MessageDispatch md, boolean 
messageExpired) throws JMSException {
        if (unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            synchronized (deliveredMessages) {
                deliveredMessages.remove(md);
            }
            stats.getExpiredMessageCount().increment();
            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;
                                if (ackCounter >= (info.getPrefetchSize() * 
.65) || System.currentTimeMillis() >= (optimizeAckTimestamp + 
optimizeAckTimeout)) {
                                        MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                        if (ack != null) {
                                            deliveredMessages.clear();
                                            ackCounter = 0;
                                            session.sendAck(ack);
                                            optimizeAckTimestamp = 
System.currentTimeMillis();
                                        }
                                }
                            } else {
                                MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack!=null) {
                                    deliveredMessages.clear();
                                    session.sendAck(ack);
                                }
                            }
                        }
                    }
                    deliveryingAcknowledgements.set(false);
                }
            } else if (isAutoAcknowledgeBatch()) {
                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
            } else if 
(session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                boolean messageUnackedByConsumer = false;
                synchronized (deliveredMessages) {
                    messageUnackedByConsumer = deliveredMessages.contains(md);
                }
                if (messageUnackedByConsumer) {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            } 
            else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
    }
What will happen when no producer will send a message to this queue so that no 
message will pass this method? When will the deliveredMessages been acked?




    
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
>                 Key: AMQ-3664
>                 URL: https://issues.apache.org/jira/browse/AMQ-3664
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
>            Reporter: Matthias Wessel
>            Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true 
> I get a dramatic performance improvement, but when I shut down the producer 
> the consumer does not acknowledge all messages! If I stop the consumer and 
> then I start the consumer a second time the consumer recieves messages again 
> and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
> {noformat}
>     <camel:camelContext id="camelContext">
>       <camel:template id="producerTemplate"/>
>       <camel:consumerTemplate id="consumerTemplate"/>
>       <camel:route>
>               <camel:from 
> uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
>               <camel:to uri="beanConsumer"/>
>       </camel:route>
>     </camel:camelContext>
> The config for the ActiveMQComponent:
>     <bean id="jms" 
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>               <property name="connectionFactory">             
>                       <bean 
> class="org.apache.activemq.pool.PooledConnectionFactory">
>                               <property name="connectionFactory">
>                                       <bean 
> class="org.apache.activemq.spring.ActiveMQConnectionFactory">
>                                               <property 
> name="optimizeAcknowledge" value="true"/>
>                                               <property name="dispatchAsync" 
> value="true"/>
>                                               <property name="sendAcksAsync" 
> value="true"/>
>                                               <property name="useAsyncSend" 
> value="true"/>
>                                               <property name="brokerURL" 
> value="nio://138-ham-de:61616"/>                                             
>                                               <property 
> name="useDedicatedTaskRunner" value="false"/> 
>                               </bean> 
>                               </property>
>                       </bean>
>               </property>
>     </bean>
> {noformat}
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
> {noformat}
>     private void afterMessageIsConsumed(MessageDispatch md, boolean 
> messageExpired) throws JMSException {
>         if (unconsumedMessages.isClosed()) {
>             return;
>         }
>         if (messageExpired) {
>             synchronized (deliveredMessages) {
>                 deliveredMessages.remove(md);
>             }
>             stats.getExpiredMessageCount().increment();
>             ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>         } else {
>             stats.onMessage();
>             if (session.getTransacted()) {
>                 // Do nothing.
>             } else if (isAutoAcknowledgeEach()) {
>                 if (deliveryingAcknowledgements.compareAndSet(false, true)) {
>                     synchronized (deliveredMessages) {
>                         if (!deliveredMessages.isEmpty()) {
>                             if (optimizeAcknowledge) {
>                                 ackCounter++;
>                                 if (ackCounter >= (info.getPrefetchSize() * 
> .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + 
> optimizeAckTimeout)) {
>                                       MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                       if (ack != null) {
>                                           deliveredMessages.clear();
>                                           ackCounter = 0;
>                                           session.sendAck(ack);
>                                           optimizeAckTimestamp = 
> System.currentTimeMillis();
>                                       }
>                                 }
>                             } else {
>                                 MessageAck ack = 
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
>                                 if (ack!=null) {
>                                     deliveredMessages.clear();
>                                     session.sendAck(ack);
>                                 }
>                             }
>                         }
>                     }
>                     deliveryingAcknowledgements.set(false);
>                 }
>             } else if (isAutoAcknowledgeBatch()) {
>                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
>             } else if 
> (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
>                 boolean messageUnackedByConsumer = false;
>                 synchronized (deliveredMessages) {
>                     messageUnackedByConsumer = deliveredMessages.contains(md);
>                 }
>                 if (messageUnackedByConsumer) {
>                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
>                 }
>             } 
>             else {
>                 throw new IllegalStateException("Invalid session state.");
>             }
>         }
>     }
> {noformat}
> What will happen when no producer will send a message to this queue so that 
> no message will pass this method? When will the deliveredMessages been acked?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to