Author: jlim Date: Sat Nov 18 05:35:51 2006 New Revision: 476523 URL: http://svn.apache.org/viewvc?view=rev&rev=476523 Log: applied patch for http://issues.apache.org/activemq/browse/AMQ-967
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=476523&r1=476522&r2=476523 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Sat Nov 18 05:35:51 2006 @@ -802,7 +802,8 @@ redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); rollbackCounter++; - if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){ + if(redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES + && rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){ // We need to NACK the messages so that they get sent to the // DLQ. // Acknowledge the last message. Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=476523&r1=476522&r2=476523 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Sat Nov 18 05:35:51 2006 @@ -714,7 +714,8 @@ RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); int redeliveryCounter = md.getMessage().getRedeliveryCounter(); - if (redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { + if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES + && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) { // We need to NACK the messages so that they get sent to the // DLQ. Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?view=diff&rev=476523&r1=476522&r2=476523 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java Sat Nov 18 05:35:51 2006 @@ -30,6 +30,8 @@ */ public class RedeliveryPolicy implements Cloneable, Serializable { + public static final int NO_MAXIMUM_REDELIVERIES = -1; + // +/-15% for a 30% spread -cgs protected double collisionAvoidanceFactor = 0.15d; protected int maximumRedeliveries = 6; Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?view=diff&rev=476523&r1=476522&r2=476523 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java Sat Nov 18 05:35:51 2006 @@ -199,4 +199,110 @@ session.commit(); } + + + /** + * @throws Exception + */ + public void testInfiniteMaximumNumberOfRedeliveries() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(100); + policy.setUseExponentialBackOff(false); + // let's set the maximum redeliveries to no maximum (ie. infinite) + policy.setMaximumRedeliveries(-1); + + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + //we should be able to get the 1st message redelivered until a session.commit is called + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.commit(); + + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + + } + + /** + * @throws Exception + */ + public void testZeroMaximumNumberOfRedeliveries() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(100); + policy.setUseExponentialBackOff(false); + //let's set the maximum redeliveries to 0 + policy.setMaximumRedeliveries(0); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + //the 1st message should not be redelivered since maximumRedeliveries is set to 0 + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + + + + } }