Author: jstrachan
Date: Mon Jun 19 04:55:44 2006
New Revision: 415300
URL: http://svn.apache.org/viewvc?rev=415300&view=rev
Log:
applied patch from Christopher G. Stach II for AMQ-747 to allow redelivery
backoff to add a collision avoidence capability. Many thanks!
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=415300&r1=415299&r2=415300&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Jun 19 04:55:44 2006
@@ -754,12 +754,7 @@
// stop the delivery of messages.
unconsumedMessages.stop();
// Start up the delivery again a little later.
- if(redeliveryDelay==0){
-
redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay();
- }else{
- if(redeliveryPolicy.isUseExponentialBackOff())
-
redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier();
- }
+ redeliveryDelay =
redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
Scheduler.executeAfterDelay(new Runnable(){
public void run(){
try{
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=415300&r1=415299&r2=415300&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Mon Jun 19 04:55:44 2006
@@ -724,12 +724,7 @@
// Figure out how long we should wait to
resend this message.
long redeliveryDelay=0;
for( int i=0; i < redeliveryCounter; i++) {
- if (redeliveryDelay == 0) {
- redeliveryDelay =
redeliveryPolicy.getInitialRedeliveryDelay();
- } else {
- if
(redeliveryPolicy.isUseExponentialBackOff())
- redeliveryDelay *=
redeliveryPolicy.getBackOffMultiplier();
- }
+ redeliveryDelay =
redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
}
Scheduler.executeAfterDelay(new Runnable() {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=415300&r1=415299&r2=415300&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
Mon Jun 19 04:55:44 2006
@@ -17,6 +17,7 @@
package org.apache.activemq;
import java.io.Serializable;
+import java.util.Random;
/**
* Configuration options used to control how messages are re-delivered when
they
@@ -26,8 +27,12 @@
*/
public class RedeliveryPolicy implements Cloneable, Serializable {
+ // +/-15% for a 30% spread -cgs
+ protected double collisionAvoidanceFactor = 0.15d;
protected int maximumRedeliveries = 5;
protected long initialRedeliveryDelay = 1000L;
+ protected static Random randomNumberGenerator;
+ protected boolean useCollisionAvoidance = false;
protected boolean useExponentialBackOff = false;
protected short backOffMultiplier = 5;
@@ -51,6 +56,14 @@
this.backOffMultiplier = backOffMultiplier;
}
+ public short getCollisionAvoidancePercent() {
+ return (short) Math.round(collisionAvoidanceFactor * 100);
+ }
+
+ public void setCollisionAvoidancePercent(short collisionAvoidancePercent) {
+ this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
+ }
+
public long getInitialRedeliveryDelay() {
return initialRedeliveryDelay;
}
@@ -67,6 +80,41 @@
this.maximumRedeliveries = maximumRedeliveries;
}
+ public long getRedeliveryDelay(long previousDelay) {
+ long redeliveryDelay;
+
+ if (previousDelay == 0) {
+ redeliveryDelay = initialRedeliveryDelay;
+ } else if (useExponentialBackOff && backOffMultiplier > 1) {
+ redeliveryDelay = previousDelay * backOffMultiplier;
+ } else {
+ redeliveryDelay = previousDelay;
+ }
+
+ if (useCollisionAvoidance) {
+ if (randomNumberGenerator == null) {
+ initRandomNumberGenerator();
+ }
+
+ /*
+ * First random determines +/-, second random determines how far to
+ * go in that direction. -cgs
+ */
+ double variance = (randomNumberGenerator.nextBoolean() ?
collisionAvoidanceFactor : -collisionAvoidanceFactor) *
randomNumberGenerator.nextDouble();
+ redeliveryDelay += redeliveryDelay * variance;
+ }
+
+ return redeliveryDelay;
+ }
+
+ public boolean isUseCollisionAvoidance() {
+ return useCollisionAvoidance;
+ }
+
+ public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
+ this.useCollisionAvoidance = useCollisionAvoidance;
+ }
+
public boolean isUseExponentialBackOff() {
return useExponentialBackOff;
}
@@ -74,4 +122,11 @@
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
this.useExponentialBackOff = useExponentialBackOff;
}
+
+ protected static synchronized void initRandomNumberGenerator() {
+ if (randomNumberGenerator == null) {
+ randomNumberGenerator = new Random();
+ }
+ }
+
}