Author: gtully
Date: Fri Aug 6 08:10:52 2010
New Revision: 982903
URL: http://svn.apache.org/viewvc?rev=982903&view=rev
Log:
resolve: fix https://issues.apache.org/activemq/browse/AMQ-1847 - set
initialRedeliveryDelay=0 to get non delayed first redelivery, use deliveryDelay
to set the base for subsequent redelivery. existing users with an
initialRedeliveryDelay set will now see it being respected.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri Aug 6 08:10:52 2010
@@ -1099,11 +1099,13 @@ public class ActiveMQMessageConsumer imp
return;
}
- // Only increase the redelivery delay after the first
redelivery..
+ // use initial delay for first redelivery
MessageDispatch lastMd = deliveredMessages.getFirst();
final int currentRedeliveryCount =
lastMd.getMessage().getRedeliveryCounter();
if (currentRedeliveryCount > 0) {
- redeliveryDelay =
redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+ redeliveryDelay =
redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
+ } else {
+ redeliveryDelay =
redeliveryPolicy.getInitialRedeliveryDelay();
}
MessageId firstMsgId =
deliveredMessages.getLast().getMessage().getMessageId();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Fri Aug 6 08:10:52 2010
@@ -874,9 +874,9 @@ public class ActiveMQSession implements
// Figure out how long we should wait to resend
// this message.
- long redeliveryDelay = 0;
+ long redeliveryDelay =
redeliveryPolicy.getInitialRedeliveryDelay();
for (int i = 0; i < redeliveryCounter; i++) {
- redeliveryDelay =
redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+ redeliveryDelay =
redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
scheduler.executeAfterDelay(new Runnable() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
Fri Aug 6 08:10:52 2010
@@ -38,6 +38,7 @@ public class RedeliveryPolicy implements
private boolean useCollisionAvoidance;
private boolean useExponentialBackOff;
private double backOffMultiplier = 5.0;
+ private long redeliveryDelay = initialRedeliveryDelay;
public RedeliveryPolicy() {
}
@@ -82,15 +83,15 @@ public class RedeliveryPolicy implements
this.maximumRedeliveries = maximumRedeliveries;
}
- public long getRedeliveryDelay(long previousDelay) {
- long redeliveryDelay;
+ public long getNextRedeliveryDelay(long previousDelay) {
+ long nextDelay;
if (previousDelay == 0) {
- redeliveryDelay = initialRedeliveryDelay;
+ nextDelay = redeliveryDelay;
} else if (useExponentialBackOff && backOffMultiplier > 1) {
- redeliveryDelay = (long) (previousDelay * backOffMultiplier);
+ nextDelay = (long) (previousDelay * backOffMultiplier);
} else {
- redeliveryDelay = previousDelay;
+ nextDelay = previousDelay;
}
if (useCollisionAvoidance) {
@@ -100,10 +101,10 @@ public class RedeliveryPolicy implements
*/
Random random = getRandomNumberGenerator();
double variance = (random.nextBoolean() ? collisionAvoidanceFactor
: -collisionAvoidanceFactor) * random.nextDouble();
- redeliveryDelay += redeliveryDelay * variance;
+ nextDelay += nextDelay * variance;
}
- return redeliveryDelay;
+ return nextDelay;
}
public boolean isUseCollisionAvoidance() {
@@ -129,4 +130,11 @@ public class RedeliveryPolicy implements
return randomNumberGenerator;
}
+ public void setRedeliveryDelay(long redeliveryDelay) {
+ this.redeliveryDelay = redeliveryDelay;
+ }
+
+ public long getRedeliveryDelay() {
+ return redeliveryDelay;
+ }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
Fri Aug 6 08:10:52 2010
@@ -62,7 +62,8 @@ public class MessageListenerRedeliveryTe
protected RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
- redeliveryPolicy.setInitialRedeliveryDelay(1000);
+ redeliveryPolicy.setInitialRedeliveryDelay(0);
+ redeliveryPolicy.setRedeliveryDelay(1000);
redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setBackOffMultiplier((short)2);
redeliveryPolicy.setUseExponentialBackOff(true);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
Fri Aug 6 08:10:52 2010
@@ -47,7 +47,8 @@ public class RedeliveryPolicyTest extend
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
- policy.setInitialRedeliveryDelay(500);
+ policy.setInitialRedeliveryDelay(0);
+ policy.setRedeliveryDelay(500);
policy.setBackOffMultiplier((short) 2);
policy.setUseExponentialBackOff(true);
@@ -102,8 +103,9 @@ public class RedeliveryPolicyTest extend
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
- policy.setInitialRedeliveryDelay(500);
-
+ policy.setInitialRedeliveryDelay(0);
+ policy.setRedeliveryDelay(500);
+
connection.start();
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue(getName());
@@ -303,5 +305,128 @@ public class RedeliveryPolicyTest extend
- }
+ }
+
+
+ public void testInitialRedeliveryDelayZero() throws Exception {
+
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(0);
+ policy.setUseExponentialBackOff(false);
+ policy.setMaximumRedeliveries(1);
+
+ connection.start();
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ session.commit();
+
+ TextMessage m;
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.commit();
+
+ session.commit();
+ }
+
+
+ public void testInitialRedeliveryDelayOne() throws Exception {
+
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(1000);
+ policy.setUseExponentialBackOff(false);
+ policy.setMaximumRedeliveries(1);
+
+ connection.start();
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ session.commit();
+
+ TextMessage m;
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(100);
+ assertNull(m);
+
+ m = (TextMessage)consumer.receive(2000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.commit();
+ }
+
+ public void testRedeliveryDelayOne() throws Exception {
+
+ // Receive a message with the JMS API
+ RedeliveryPolicy policy = connection.getRedeliveryPolicy();
+ policy.setInitialRedeliveryDelay(0);
+ policy.setRedeliveryDelay(1000);
+ policy.setUseExponentialBackOff(false);
+ policy.setMaximumRedeliveries(2);
+
+ connection.start();
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ session.commit();
+
+ TextMessage m;
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull("first immediate redelivery", m);
+ session.rollback();
+
+ m = (TextMessage)consumer.receive(100);
+ assertNull("second delivery delayed: " + m, m);
+
+ m = (TextMessage)consumer.receive(2000);
+ assertNotNull(m);
+ assertEquals("1st", m.getText());
+
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ assertEquals("2nd", m.getText());
+ session.commit();
+ }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java?rev=982903&r1=982902&r2=982903&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java
Fri Aug 6 08:10:52 2010
@@ -57,7 +57,7 @@ public class AMQ2021Test extends TestCas
AMQ2021Test testCase;
String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";
- String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND +
"?jms.redeliveryPolicy.maximumRedeliveries=1";
+ String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND +
"?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
private int numMessages = 1000;
private int numConsumers = 2;