Author: chirino
Date: Mon Jul 10 21:38:18 2006
New Revision: 420718

URL: http://svn.apache.org/viewvc?rev=420718&view=rev
Log:
Added reconnect logic.
http://issues.apache.org/activemq/browse/AMQ-803

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=420718&r1=420717&r2=420718&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
 Mon Jul 10 21:38:18 2006
@@ -23,6 +23,8 @@
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryListener;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * A simple DiscoveryAgent that allows static configuration of the discovered 
services.
  * 
@@ -30,9 +32,31 @@
  */
 public class SimpleDiscoveryAgent implements DiscoveryAgent {
     
+    private long initialReconnectDelay = 1000*5;
+    private long maxReconnectDelay = 1000 * 30;
+    private long backOffMultiplier = 2;
+    private boolean useExponentialBackOff = false;
+    private int maxReconnectAttempts;
+    private final Object sleepMutex = new Object();
+    private long minConnectTime = 500;
+
     private DiscoveryListener listener;
     String services[] = new String[] {};
     String group = "DEFAULT";
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    
+    class SimpleDiscoveryEvent extends DiscoveryEvent {
+               
+       private int connectFailures;
+        private long reconnectDelay = initialReconnectDelay;
+        private long connectTime = System.currentTimeMillis();
+        private AtomicBoolean failed = new AtomicBoolean(false);
+
+        public SimpleDiscoveryEvent(String service) {
+                       super(service);
+               }
+        
+    }
     
     public void setDiscoveryListener(DiscoveryListener listener) {
         this.listener = listener;
@@ -42,12 +66,17 @@
     }
     
     public void start() throws Exception {
+       running.set(true);
         for (int i = 0; i < services.length; i++) {
-            listener.onServiceAdd(new DiscoveryEvent(services[i]));
+            listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
         }
     }
     
     public void stop() throws Exception {
+       running.set(false);
+       synchronized(sleepMutex) {
+               sleepMutex.notifyAll();
+       }
     }
   
     public String[] getServices() {
@@ -80,7 +109,112 @@
     public void setBrokerName(String brokerName) {
     }
 
-    public void serviceFailed(DiscoveryEvent event) throws IOException {
+    public void serviceFailed(DiscoveryEvent devent) throws IOException {
+       
+        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
+        if( event.failed.compareAndSet(false, true) ) {
+               
+                       listener.onServiceRemove(event);
+               Thread thread = new Thread() {
+                       public void run() {
+       
+       
+                               // We detect a failed connection attempt 
because the service fails right
+                               // away.
+                               if( event.connectTime + minConnectTime > 
System.currentTimeMillis()  ) {
+                                       
+                                       event.connectFailures++;
+                                       
+                                       if( maxReconnectAttempts>0 &&  
event.connectFailures >= maxReconnectAttempts ) {
+                                               // Don' try to re-connect
+                                               return;
+                                       }
+                                       
+                               synchronized(sleepMutex){
+                                   try{
+                                       if( !running.get() )
+                                               return;
+                                       
+                                       sleepMutex.wait(event.reconnectDelay);
+                                   }catch(InterruptedException ie){
+                                      return;
+                                   }
+                               }
+       
+                               if (!useExponentialBackOff) {
+                                   event.reconnectDelay = 
initialReconnectDelay;
+                               } else {
+                                   // Exponential increment of reconnect delay.
+                                   event.reconnectDelay*=backOffMultiplier;
+                                   if(event.reconnectDelay>maxReconnectDelay)
+                                       event.reconnectDelay=maxReconnectDelay;
+                               }
+                               
+                               } else {
+                                       event.connectFailures = 0;
+                           event.reconnectDelay = initialReconnectDelay;
+                               }
+                                                                       
+                       if( !running.get() )
+                               return;
+                       
+                               event.connectTime = System.currentTimeMillis();
+                               event.failed.set(false);
+                               
+                               listener.onServiceAdd(event);
+                       }
+               };
+               thread.setDaemon(true);
+               thread.start();
+        }
     }
+
+       public long getBackOffMultiplier() {
+               return backOffMultiplier;
+       }
+
+       public void setBackOffMultiplier(long backOffMultiplier) {
+               this.backOffMultiplier = backOffMultiplier;
+       }
+
+       public long getInitialReconnectDelay() {
+               return initialReconnectDelay;
+       }
+
+       public void setInitialReconnectDelay(long initialReconnectDelay) {
+               this.initialReconnectDelay = initialReconnectDelay;
+       }
+
+       public int getMaxReconnectAttempts() {
+               return maxReconnectAttempts;
+       }
+
+       public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+               this.maxReconnectAttempts = maxReconnectAttempts;
+       }
+
+       public long getMaxReconnectDelay() {
+               return maxReconnectDelay;
+       }
+
+       public void setMaxReconnectDelay(long maxReconnectDelay) {
+               this.maxReconnectDelay = maxReconnectDelay;
+       }
+
+       public long getMinConnectTime() {
+               return minConnectTime;
+       }
+
+       public void setMinConnectTime(long minConnectTime) {
+               this.minConnectTime = minConnectTime;
+       }
+
+       public boolean isUseExponentialBackOff() {
+               return useExponentialBackOff;
+       }
+
+       public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+               this.useExponentialBackOff = useExponentialBackOff;
+       }
     
 }


Reply via email to