Author: chirino
Date: Fri Apr 28 08:11:39 2006
New Revision: 397915
URL: http://svn.apache.org/viewcvs?rev=397915&view=rev
Log:
Applyed Rodrigo S de Castro's latest unit test patch and fixed the redelivery
problem. Redelivery was not being delayed when rollback was called from the
message listener.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=397915&r1=397914&r2=397915&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri Apr 28 08:11:39 2006
@@ -788,7 +788,7 @@
MessageListener listener = this.messageListener;
try {
if (!unconsumedMessages.isClosed()) {
- if (listener != null && started.get()) {
+ if (listener != null && unconsumedMessages.isRunning() ) {
ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md);
try {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=397915&r1=397914&r2=397915&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Fri Apr 28 08:11:39 2006
@@ -147,7 +147,7 @@
return false;
} else {
dispatch(message);
- return true;
+ return messageQueue.isRunning();
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?rev=397915&r1=397914&r2=397915&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Fri Apr 28 08:11:39 2006
@@ -35,7 +35,8 @@
public class MessageListenerRedeliveryTest extends TestCase {
- private static final Log log =
LogFactory.getLog(MessageListenerRedeliveryTest.class);
+ private Log log = LogFactory.getLog(getClass());
+
private Connection connection;
protected void setUp() throws Exception {
@@ -55,8 +56,8 @@
protected RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(1000);
- redeliveryPolicy.setBackOffMultiplier((short) 5);
- redeliveryPolicy.setMaximumRedeliveries(10);
+ redeliveryPolicy.setMaximumRedeliveries(2);
+ redeliveryPolicy.setBackOffMultiplier((short) 2);
redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy;
}
@@ -67,59 +68,99 @@
return factory.createConnection();
}
- private class ConsumerMessageListenerTest implements MessageListener {
- private ActiveMQMessageConsumer consumer;
+ private class TestMessageListener implements MessageListener {
+ private Session session;
+
public int counter = 0;
- public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
- this.consumer = consumer;
+ public TestMessageListener(Session session) {
+ this.session = session;
}
public void onMessage(Message message) {
try {
- log.info("Message: " + message);
+ log.info("Message Received: " + message);
counter++;
- if (counter <= 2) {
- log.info("ROLLBACK");
- consumer.rollback();
+ if (counter <= 3) {
+ log.info("Message Rollback.");
+ session.rollback();
} else {
- log.info("COMMIT");
+ log.info("Message Commit.");
message.acknowledge();
- consumer.commit();
+ session.commit();
}
} catch (JMSException e) {
- System.err.println("Error when rolling back transaction");
+ log.error("Error when rolling back transaction");
}
}
}
- private class SessionMessageListenerTest implements MessageListener {
- private Session session;
- public int counter = 0;
+ public void testQueueRollbackConsumerListener() throws JMSException {
+ connection.start();
+
+ Session session = connection.createSession(true,
Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue("queue-" + getName());
+ MessageProducer producer = createProducer(session, queue);
+ Message message = createTextMessage(session);
+ producer.send(message);
+ session.commit();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
+ mc.setRedeliveryPolicy(getRedeliveryPolicy());
+
+ TestMessageListener listener = new TestMessageListener(session);
+ consumer.setMessageListener(listener);
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
- public SessionMessageListenerTest(Session session) {
- this.session = session;
}
+ // first try
+ assertEquals(1, listener.counter);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
- public void onMessage(Message message) {
- try {
- log.info("Message: " + message);
- counter++;
- if (counter < 2) {
- log.info("ROLLBACK");
- session.rollback();
- } else {
- log.info("COMMIT");
- message.acknowledge();
- session.commit();
- }
- } catch (JMSException e) {
- System.err.println("Error when rolling back transaction");
- }
}
+ // second try (redelivery after 1 sec)
+ assertEquals(2, listener.counter);
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+
+ }
+ // third try (redelivery after 2 seconds) - it should give up after
that
+ assertEquals(3, listener.counter);
+
+ // create new message
+ producer.send(createTextMessage(session));
+ session.commit();
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // it should be committed, so no redelivery
+ assertEquals(4, listener.counter);
+
+ try {
+ Thread.sleep(1500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // no redelivery, counter should still be 4
+ assertEquals(4, listener.counter);
+
+ session.close();
}
- public void testQueueRollbackMessageListener() throws JMSException {
+ public void testQueueRollbackSessionListener() throws JMSException {
connection.start();
Session session = connection.createSession(true,
Session.CLIENT_ACKNOWLEDGE);
@@ -134,25 +175,52 @@
ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
mc.setRedeliveryPolicy(getRedeliveryPolicy());
- SessionMessageListenerTest listener = new
SessionMessageListenerTest(session);
+ TestMessageListener listener = new TestMessageListener(session);
consumer.setMessageListener(listener);
try {
- Thread.sleep(7000);
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+
+ }
+ // first try
+ assertEquals(1, listener.counter);
+
+ try {
+ Thread.sleep(1000);
} catch (InterruptedException e) {
}
+ // second try (redelivery after 1 sec)
assertEquals(2, listener.counter);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+
+ }
+ // third try (redelivery after 2 seconds) - it should give up after
that
+ assertEquals(3, listener.counter);
+
+ // create new message
producer.send(createTextMessage(session));
session.commit();
try {
- Thread.sleep(2000);
+ Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
- assertEquals(3, listener.counter);
+ // it should be committed, so no redelivery
+ assertEquals(4, listener.counter);
+
+ try {
+ Thread.sleep(1500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // no redelivery, counter should still be 4
+ assertEquals(4, listener.counter);
session.close();
}