[
https://issues.apache.org/jira/browse/AMQ-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13451306#comment-13451306
]
Claus Ibsen commented on AMQ-3664:
----------------------------------
Gary
Yeah maybe a multiplexed background thread could run and acknowledge the
messages if the consumer has been inactive for a period.
I wonder if we can use a single thread for all optimized ack consumers, I would
assume a thread per consumer would bee too much?
> Not all messages will be acknowledged when optimizeAcknowledge is true
> ----------------------------------------------------------------------
>
> Key: AMQ-3664
> URL: https://issues.apache.org/jira/browse/AMQ-3664
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.1
> Environment: Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27
> Reporter: Matthias Wessel
> Priority: Critical
>
> I make performance test with activemq. When I set optimizeAcknowledge = true
> I get a dramatic performance improvement, but when I shut down the producer
> the consumer does not acknowledge all messages! If I stop the consumer and
> then I start the consumer a second time the consumer recieves messages again
> and again not all messages will be acknoledged in the queue.
> I am using camel 2.9.0 to produce and consume the messages.
> I am using the consumer Template with asyncSendBody.
> The following route is configured in the camelContext:
> {noformat}
> <camel:camelContext id="camelContext">
> <camel:template id="producerTemplate"/>
> <camel:consumerTemplate id="consumerTemplate"/>
> <camel:route>
> <camel:from
> uri="jms:queue0?concurrentConsumers=3&maxConcurrentConsumers=10&asyncConsumer=true"/>
> <camel:to uri="beanConsumer"/>
> </camel:route>
> </camel:camelContext>
> The config for the ActiveMQComponent:
> <bean id="jms"
> class="org.apache.activemq.camel.component.ActiveMQComponent">
> <property name="connectionFactory">
> <bean
> class="org.apache.activemq.pool.PooledConnectionFactory">
> <property name="connectionFactory">
> <bean
> class="org.apache.activemq.spring.ActiveMQConnectionFactory">
> <property
> name="optimizeAcknowledge" value="true"/>
> <property name="dispatchAsync"
> value="true"/>
> <property name="sendAcksAsync"
> value="true"/>
> <property name="useAsyncSend"
> value="true"/>
> <property name="brokerURL"
> value="nio://138-ham-de:61616"/>
> <property
> name="useDedicatedTaskRunner" value="false"/>
> </bean>
> </property>
> </bean>
> </property>
> </bean>
> {noformat}
> I think, the problem is here:
> Class ActiveMQMessageConsumer:
> {noformat}
> private void afterMessageIsConsumed(MessageDispatch md, boolean
> messageExpired) throws JMSException {
> if (unconsumedMessages.isClosed()) {
> return;
> }
> if (messageExpired) {
> synchronized (deliveredMessages) {
> deliveredMessages.remove(md);
> }
> stats.getExpiredMessageCount().increment();
> ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> } else {
> stats.onMessage();
> if (session.getTransacted()) {
> // Do nothing.
> } else if (isAutoAcknowledgeEach()) {
> if (deliveryingAcknowledgements.compareAndSet(false, true)) {
> synchronized (deliveredMessages) {
> if (!deliveredMessages.isEmpty()) {
> if (optimizeAcknowledge) {
> ackCounter++;
> if (ackCounter >= (info.getPrefetchSize() *
> .65) || System.currentTimeMillis() >= (optimizeAckTimestamp +
> optimizeAckTimeout)) {
> MessageAck ack =
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
> if (ack != null) {
> deliveredMessages.clear();
> ackCounter = 0;
> session.sendAck(ack);
> optimizeAckTimestamp =
> System.currentTimeMillis();
> }
> }
> } else {
> MessageAck ack =
> makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
> if (ack!=null) {
> deliveredMessages.clear();
> session.sendAck(ack);
> }
> }
> }
> }
> deliveryingAcknowledgements.set(false);
> }
> } else if (isAutoAcknowledgeBatch()) {
> ackLater(md, MessageAck.STANDARD_ACK_TYPE);
> } else if
> (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
> boolean messageUnackedByConsumer = false;
> synchronized (deliveredMessages) {
> messageUnackedByConsumer = deliveredMessages.contains(md);
> }
> if (messageUnackedByConsumer) {
> ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
> }
> }
> else {
> throw new IllegalStateException("Invalid session state.");
> }
> }
> }
> {noformat}
> What will happen when no producer will send a message to this queue so that
> no message will pass this method? When will the deliveredMessages been acked?
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira