Author: jstrachan
Date: Mon Jun 19 04:55:44 2006
New Revision: 415300

URL: http://svn.apache.org/viewvc?rev=415300&view=rev
Log:
applied patch from Christopher G. Stach II for AMQ-747 to allow redelivery 
backoff to add a collision avoidence capability. Many thanks!

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=415300&r1=415299&r2=415300&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Mon Jun 19 04:55:44 2006
@@ -754,12 +754,7 @@
                 // stop the delivery of messages.
                 unconsumedMessages.stop();
                 // Start up the delivery again a little later.
-                if(redeliveryDelay==0){
-                    
redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay();
-                }else{
-                    if(redeliveryPolicy.isUseExponentialBackOff())
-                        
redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier();
-                }
+                redeliveryDelay = 
redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
                 Scheduler.executeAfterDelay(new Runnable(){
                     public void run(){
                         try{

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=415300&r1=415299&r2=415300&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 Mon Jun 19 04:55:44 2006
@@ -724,12 +724,7 @@
                                 // Figure out how long we should wait to 
resend this message.
                                 long redeliveryDelay=0;
                                 for( int i=0; i < redeliveryCounter; i++) {
-                                    if (redeliveryDelay == 0) {
-                                        redeliveryDelay = 
redeliveryPolicy.getInitialRedeliveryDelay();
-                                    } else {
-                                        if 
(redeliveryPolicy.isUseExponentialBackOff())
-                                            redeliveryDelay *= 
redeliveryPolicy.getBackOffMultiplier();
-                                    }
+                                    redeliveryDelay = 
redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
                                 }
                                 
                                 Scheduler.executeAfterDelay(new Runnable() {

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=415300&r1=415299&r2=415300&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
 Mon Jun 19 04:55:44 2006
@@ -17,6 +17,7 @@
 package org.apache.activemq;
 
 import java.io.Serializable;
+import java.util.Random;
 
 /**
  * Configuration options used to control how messages are re-delivered when 
they
@@ -26,8 +27,12 @@
  */
 public class RedeliveryPolicy implements Cloneable, Serializable {
 
+    // +/-15% for a 30% spread -cgs
+    protected double collisionAvoidanceFactor = 0.15d;
     protected int maximumRedeliveries = 5;
     protected long initialRedeliveryDelay = 1000L;
+    protected static Random randomNumberGenerator;
+    protected boolean useCollisionAvoidance = false;
     protected boolean useExponentialBackOff = false;
     protected short backOffMultiplier = 5;
 
@@ -51,6 +56,14 @@
         this.backOffMultiplier = backOffMultiplier;
     }
 
+    public short getCollisionAvoidancePercent() {
+        return (short) Math.round(collisionAvoidanceFactor * 100);
+    }
+
+    public void setCollisionAvoidancePercent(short collisionAvoidancePercent) {
+        this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
+    }
+
     public long getInitialRedeliveryDelay() {
         return initialRedeliveryDelay;
     }
@@ -67,6 +80,41 @@
         this.maximumRedeliveries = maximumRedeliveries;
     }
 
+    public long getRedeliveryDelay(long previousDelay) {
+        long redeliveryDelay;
+
+        if (previousDelay == 0) {
+            redeliveryDelay = initialRedeliveryDelay;
+        } else if (useExponentialBackOff && backOffMultiplier > 1) {
+            redeliveryDelay = previousDelay * backOffMultiplier;
+        } else {
+            redeliveryDelay = previousDelay;
+        }
+
+        if (useCollisionAvoidance) {
+            if (randomNumberGenerator == null) {
+                initRandomNumberGenerator();
+            }
+
+            /* 
+             * First random determines +/-, second random determines how far to
+             * go in that direction. -cgs
+             */
+            double variance = (randomNumberGenerator.nextBoolean() ? 
collisionAvoidanceFactor : -collisionAvoidanceFactor) * 
randomNumberGenerator.nextDouble();
+            redeliveryDelay += redeliveryDelay * variance;
+        }
+
+        return redeliveryDelay;
+    }
+
+    public boolean isUseCollisionAvoidance() {
+        return useCollisionAvoidance;
+    }
+
+    public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
+        this.useCollisionAvoidance = useCollisionAvoidance;
+    }
+
     public boolean isUseExponentialBackOff() {
         return useExponentialBackOff;
     }
@@ -74,4 +122,11 @@
     public void setUseExponentialBackOff(boolean useExponentialBackOff) {
         this.useExponentialBackOff = useExponentialBackOff;
     }
+
+    protected static synchronized void initRandomNumberGenerator() {
+        if (randomNumberGenerator == null) {
+            randomNumberGenerator = new Random();
+        }
+    }
+
 }


Reply via email to