Author: gtully
Date: Sat Feb 27 10:07:05 2010
New Revision: 916936
URL: http://svn.apache.org/viewvc?rev=916936&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-906 - have retries with
autoack and runtime exception from onMessage
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=916936&r1=916935&r2=916936&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Sat Feb 27 10:07:05 2010
@@ -650,6 +650,7 @@
void doClose() throws JMSException {
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
+ LOG.info("remove: " + this.getConsumerId() + ",
lasteDeliveredSequenceId:" + lastDeliveredSequenceId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
this.session.asyncSendPacket(removeCommand);
}
@@ -1205,14 +1206,15 @@
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
+ LOG.error(getConsumerId() + " Exception while
processing message: " + md.getMessage().getMessageId(), e);
if (isAutoAcknowledgeBatch() ||
isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
- // Redeliver the message
+ // schedual redelivery and possible dlq
processing
+ rollback();
} else {
// Transacted or Client ack: Deliver the
// next message.
afterMessageIsConsumed(md, false);
}
- LOG.error(getConsumerId() + " Exception while
processing message: " + e, e);
}
} else {
unconsumedMessages.enqueue(md);
@@ -1328,14 +1330,7 @@
if (listener != null) {
MessageDispatch md = unconsumedMessages.dequeueNoWait();
if (md != null) {
- try {
- ActiveMQMessage message = createActiveMQMessage(md);
- beforeMessageIsConsumed(md);
- listener.onMessage(message);
- afterMessageIsConsumed(md, false);
- } catch (JMSException e) {
- session.connection.onClientInternalException(e);
- }
+ dispatch(md);
return true;
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?rev=916936&r1=916935&r2=916936&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Sat Feb 27 10:07:05 2010
@@ -16,6 +16,11 @@
*/
package org.apache.activemq;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -29,6 +34,9 @@
import javax.jms.TextMessage;
import junit.framework.TestCase;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -227,6 +235,97 @@
session.close();
}
+ public void testQueueSessionListenerExceptionRetry() throws Exception {
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("queue-" + getName());
+ MessageProducer producer = createProducer(session, queue);
+ Message message = createTextMessage(session, "1");
+ producer.send(message);
+ message = createTextMessage(session, "2");
+ producer.send(message);
+
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ final CountDownLatch gotMessage = new CountDownLatch(2);
+ final AtomicInteger count = new AtomicInteger(0);
+ final int maxDeliveries =
getRedeliveryPolicy().getMaximumRedeliveries();
+ final ArrayList<String> received = new ArrayList<String>();
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ LOG.info("Message Received: " + message);
+ try {
+ received.add(((TextMessage) message).getText());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ fail(e.toString());
+ }
+ if (count.incrementAndGet() < maxDeliveries) {
+ throw new RuntimeException(getName() + " force a
redelivery");
+ }
+ // new blood
+ count.set(0);
+ gotMessage.countDown();
+ }
+ });
+
+ assertTrue("got message before retry expiry", gotMessage.await(20,
TimeUnit.SECONDS));
+
+ for (int i=0; i<maxDeliveries; i++) {
+ assertEquals("got first redelivered: " + i, "1", received.get(i));
+ }
+ for (int i=maxDeliveries; i<maxDeliveries*2; i++) {
+ assertEquals("got first redelivered: " + i, "2", received.get(i));
+ }
+ session.close();
+ }
+
+
+ public void testQueueSessionListenerExceptionDlq() throws Exception {
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("queue-" + getName());
+ MessageProducer producer = createProducer(session, queue);
+ Message message = createTextMessage(session);
+ producer.send(message);
+
+ ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
+ MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
+ final CountDownLatch gotDlqMessage = new CountDownLatch(1);
+ dlqConsumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ gotDlqMessage.countDown();
+ }
+ });
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ final int maxDeliveries =
getRedeliveryPolicy().getMaximumRedeliveries();
+ final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
+
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ LOG.info("Message Received: " + message);
+ gotMessage.countDown();
+ throw new RuntimeException(getName() + " force a redelivery");
+ }
+ });
+
+ assertTrue("got message before retry expiry", gotMessage.await(20,
TimeUnit.SECONDS));
+
+ // check DLQ
+ assertTrue("got dlq message", gotDlqMessage.await(20,
TimeUnit.SECONDS));
+
+ session.close();
+ }
+
+
+ private TextMessage createTextMessage(Session session, String text) throws
JMSException {
+ return session.createTextMessage(text);
+ }
private TextMessage createTextMessage(Session session) throws JMSException
{
return session.createTextMessage("Hello");
}