Author: pero
Date: Sun Aug 19 14:04:15 2007
New Revision: 567470

URL: http://svn.apache.org/viewvc?rev=567470&view=rev
Log:
Recovery membership heartbeat after interface down (Fix Bug 40042).

Modified:
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
    
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
    tomcat/container/tc5.5.x/webapps/docs/changelog.xml
    tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java
 Sun Aug 19 14:04:15 2007
@@ -59,7 +59,7 @@
     /**
      * The descriptive information about this implementation.
      */
-    private static final String info = "McastService/2.1";
+    private static final String info = "McastService/2.2";
 
     /**
      * The implementation specific properties
@@ -310,7 +310,16 @@
                                     ttl,
                                     soTimeout,
                                     this);
-
+        String value = properties.getProperty("recoveryEnabled","true");
+        boolean recEnabled = Boolean.valueOf(value).booleanValue() ;
+        impl.setRecoveryEnabled(recEnabled);        
+        int recCnt = 
Integer.parseInt(properties.getProperty("recoveryCounter","10"));
+        impl.setRecoveryCounter(recCnt);
+        long recSlpTime = 
Long.parseLong(properties.getProperty("recoverySleepTime","5000"));
+        impl.setRecoverySleepTime(recSlpTime);
+        if(log.isDebugEnabled())
+            log.debug("Recovery Options (enabled=" + recEnabled + ",counter=" 
+recCnt+ ",time=" +recSlpTime+").");
+        
         impl.start(level);
                long memberwait = 
(Long.parseLong(properties.getProperty("msgFrequency"))*4);
         if(log.isInfoEnabled())
@@ -479,6 +488,36 @@
         properties.setProperty("mcastTTL", String.valueOf(mcastTTL));
     }
 
+    public int getRecoveryCounter() {
+        if(impl != null)
+            return impl.getRecoveryCounter() ;
+        else return 
Integer.parseInt(properties.getProperty("recoveryCounter","10"));
+    }
+    
+    public boolean isRecoveryEnabled() {
+        if(impl != null)
+            return impl.isRecoveryEnabled() ;
+        else return 
Boolean.getBoolean(properties.getProperty("recoveryEnabled","true"));
+    }
+
+    public long getRecoverySleepTime() {
+        if(impl != null)
+            return impl.getRecoverySleepTime() ;
+        else return 
Long.parseLong(properties.getProperty("recoverySleepTime","5000"));
+    }
+    
+    public void setRecoveryCounter(int recoveryCounter) {
+        properties.setProperty("recoveryCounter", 
String.valueOf(recoveryCounter));
+    }
+
+    public void setRecoveryEnabled(boolean recoveryEnabled) {
+        properties.setProperty("recoveryEnabled", 
String.valueOf(recoveryEnabled));
+    }
+
+    public void setRecoverySleepTime(long recoverySleepTime) {
+        properties.setProperty("recoverySleepTime", 
String.valueOf(recoverySleepTime));
+    }
+
     /**
      * Simple test program
      * @param args Command-line arguments
@@ -501,4 +540,7 @@
         service.start();
         Thread.sleep(60*1000*60);
     }
+    
+    
+    
 }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java
 Sun Aug 19 14:04:15 2007
@@ -22,6 +22,8 @@
 import java.io.IOException;
 import java.net.InetAddress ;
 import java.net.DatagramPacket;
+import java.net.SocketTimeoutException;
+
 import org.apache.catalina.cluster.MembershipListener;
 
 /**
@@ -33,6 +35,7 @@
  * Need to fix this, could use java.nio and only need one thread to send and 
receive, or
  * just use a timeout on the receive
  * @author Filip Hanik
+ * @author Peter Rossbach
  * @version $Revision$, $Date$
  */
 public class McastServiceImpl
@@ -102,6 +105,21 @@
     protected InetAddress mcastBindAddress = null;
 
     /**
+     * nr of times the system has to fail before a recovery is initiated
+     */
+    protected int recoveryCounter = 10;
+    
+    /**
+     * The time the recovery thread sleeps between recovery attempts
+     */
+    protected long recoverySleepTime = 5000;
+    
+    /**
+     * Add the ability to turn on/off recovery
+     */
+    protected boolean recoveryEnabled = true;
+    
+    /**
      * Create a new mcast service impl
      * @param member - the local member
      * @param sendFrequency - the time (ms) in between pings sent out
@@ -129,6 +147,13 @@
         this.mcastSoTimeout = soTimeout;
         this.mcastTTL = ttl;
         this.mcastBindAddress = bind;
+        timeToExpiration = expireTime;
+        this.service = service;
+        this.sendFrequency = sendFrequency;
+        init();
+    }
+
+    protected void init() throws IOException {
         setupSocket();
         sendPacket = new DatagramPacket(new byte[1000],1000);
         sendPacket.setAddress(address);
@@ -136,27 +161,25 @@
         receivePacket = new DatagramPacket(new byte[1000],1000);
         receivePacket.setAddress(address);
         receivePacket.setPort(port);
-        membership = new McastMembership(member.getName());
-        timeToExpiration = expireTime;
-        this.service = service;
-        this.sendFrequency = sendFrequency;
+        if(membership == null) membership = new 
McastMembership(member.getName());
     }
     
     protected void setupSocket() throws IOException {
         if (mcastBindAddress != null) socket = new MulticastSocket(new 
java.net.
             InetSocketAddress(mcastBindAddress, port));
         else socket = new MulticastSocket(port);
+           socket.setLoopbackMode(false); //hint that we don't need loop back 
messages
         if (mcastBindAddress != null) {
                        if(log.isInfoEnabled())
                 log.info("Setting multihome multicast interface to:" +
                          mcastBindAddress);
             socket.setInterface(mcastBindAddress);
         } //end if
-        if ( mcastSoTimeout >= 0 ) {
-                       if(log.isInfoEnabled())
-                log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
-            socket.setSoTimeout(mcastSoTimeout);
-        }
+        //force a so timeout so that we don't block forever
+        if ( mcastSoTimeout <= 0 ) mcastSoTimeout = (int)sendFrequency;
+        if(log.isInfoEnabled())
+            log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout);
+        socket.setSoTimeout(mcastSoTimeout);
         if ( mcastTTL >= 0 ) {
                        if(log.isInfoEnabled())
                 log.info("Setting cluster mcast TTL to " + mcastTTL);
@@ -193,11 +216,17 @@
      * @throws IOException if the service fails to disconnect from the sockets
      */
     public synchronized void stop() throws IOException {
-        socket.leaveGroup(address);
-        doRun = false;
-        sender = null;
-        receiver = null;
-        serviceStartTime = Long.MAX_VALUE;
+        try {
+            socket.leaveGroup(address);
+        } catch (IOException ignore) {
+        } finally {
+            doRun = false;
+            if(sender!= null) sender.interrupt() ;
+            sender = null;
+            if(receiver!= null) receiver.interrupt() ;
+            receiver = null;
+            serviceStartTime = Long.MAX_VALUE;
+        }
     }
 
     /**
@@ -205,22 +234,37 @@
      * @throws IOException
      */
     public void receive() throws IOException {
-        socket.receive(receivePacket);
-        byte[] data = new byte[receivePacket.getLength()];
-        
System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length);
-        McastMember m = McastMember.getMember(data);
-        if(log.isDebugEnabled())
-            log.debug("Mcast receive ping from member " + m);
-        if ( membership.memberAlive(m) ) {
+        try {
+            socket.receive(receivePacket);
+
+            byte[] data = new byte[receivePacket.getLength()];
+            
System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length);
+            McastMember m = McastMember.getMember(data);
             if(log.isDebugEnabled())
-                log.debug("Mcast add member " + m);
-            service.memberAdded(m);
+                log.debug("Mcast receive ping from member " + m);
+            if ( membership.memberAlive(m) ) {
+                if(log.isDebugEnabled())
+                    log.debug("Mcast add member " + m);
+                service.memberAdded(m);
+            }
+        } finally {
+            checkExpire();
         }
-        McastMember[] expired = membership.expire(timeToExpiration);
-        for ( int i=0; i<expired.length; i++) {
-            if(log.isDebugEnabled())
-                log.debug("Mcast exipre  member " + m);
-            service.memberDisappeared(expired[i]);
+    }
+
+    protected Object expiredMutex = new Object();
+
+    /**
+     * check member exipre or alive
+     */
+    protected void checkExpire() {
+        synchronized (expiredMutex) {
+            McastMember[] expired = membership.expire(timeToExpiration);
+            for ( int i=0; i<expired.length; i++) {
+                if(log.isDebugEnabled())
+                    log.debug("Mcast exipre member " + expired[i]);
+                service.memberDisappeared(expired[i]);
+            }
         }
     }
 
@@ -229,55 +273,198 @@
      * @throws Exception
      */
     public void send() throws Exception{
-        member.inc();
-        if(log.isDebugEnabled())
-            log.debug("Mcast send ping from member " + member);
-        byte[] data = member.getData(this.serviceStartTime);
-        DatagramPacket p = new DatagramPacket(data,data.length);
-        p.setAddress(address);
-        p.setPort(port);
-        socket.send(p);
+        try {
+            member.inc();
+
+            if(log.isDebugEnabled())
+                log.debug("Mcast send ping from member " + member);
+            byte[] data = member.getData(this.serviceStartTime);
+            DatagramPacket p = new DatagramPacket(data,data.length);
+            p.setAddress(address);
+            p.setPort(port);
+            socket.send(p);
+        } finally {
+            checkExpire() ;
+        }
     }
 
     public long getServiceStartTime() {
        return this.serviceStartTime;
     }
 
+    public int getRecoveryCounter() {
+        return recoveryCounter;
+    }
+
+    public boolean isRecoveryEnabled() {
+        return recoveryEnabled;
+    }
+
+    public long getRecoverySleepTime() {
+        return recoverySleepTime;
+    }
 
+    public void setRecoveryCounter(int recoveryCounter) {
+        this.recoveryCounter = recoveryCounter;
+    }
+
+    public void setRecoveryEnabled(boolean recoveryEnabled) {
+        this.recoveryEnabled = recoveryEnabled;
+    }
+
+    public void setRecoverySleepTime(long recoverySleepTime) {
+        this.recoverySleepTime = recoverySleepTime;
+    }
+    
     public class ReceiverThread extends Thread {
+        
         public ReceiverThread() {
             super();
             setName("Cluster-MembershipReceiver");
         }
+        
         public void run() {
+            int errorCounter = 0 ;
             while ( doRun ) {
                 try {
                     receive();
+                    errorCounter = 0;
                 } catch ( Exception x ) {
-                    log.warn("Error receiving mcast package. Sleeping 
500ms",x);
-                    try { Thread.sleep(500); } catch ( Exception ignore ){}
-                    
+                    if (errorCounter==0) { 
+                        if(! (x instanceof SocketTimeoutException))
+                            log.warn("Error receiving mcast package 
(errorCounter=" +errorCounter+ "). Sleeping " +sendFrequency + " ms",x);
+                    } else {
+                        if(! (x instanceof SocketTimeoutException)
+                            && log.isDebugEnabled())
+                            log.debug("Error receiving mcast package 
(errorCounter=" +errorCounter+ "). Sleeping " +sendFrequency+ " ms",x);
+                    }
+                    try { Thread.sleep(sendFrequency); } catch ( Exception 
ignore ){}
+                    if ( (++errorCounter)>=recoveryCounter ) {
+                        log.warn("Error receiving mcast package 
(errorCounter=" +errorCounter+ "). Try Recovery!",x);
+                        errorCounter=0;
+                        new RecoveryThread(McastServiceImpl.this);
+                    }
                 }
             }
+            log.warn("Receiver Thread ends with errorCounter=" +errorCounter+ 
".");
+            
         }
-    }//class ReceiverThread
+    }
 
     public class SenderThread extends Thread {
+        
         long time;
+        
+        McastServiceImpl service ;
+        
         public SenderThread(long time) {
             this.time = time;
             setName("Cluster-MembershipSender");
 
         }
+        
         public void run() {
+            int errorCounter = 0 ;
             while ( doRun ) {
                 try {
                     send();
+                    errorCounter = 0;
                 } catch ( Exception x ) {
-                    log.warn("Unable to send mcast message.",x);
+                    if (errorCounter==0) {
+                        log.warn("Unable to send mcast message.",x);
+                    }
+                    else {
+                        if(log.isDebugEnabled())
+                            log.debug("Unable to send mcast message.",x);
+                    }
+                    if ( (++errorCounter)>=recoveryCounter ) {
+                        errorCounter=0;
+                        new RecoveryThread(McastServiceImpl.this);
+                     }
                 }
                 try { Thread.sleep(time); } catch ( Exception ignore ) {}
             }
+            log.warn("Sender Thread ends with errorCounter=" +errorCounter+ 
".");
+        }       
+    }
+    
+    protected static class RecoveryThread extends Thread {
+        
+        static boolean running = false;
+        
+        McastServiceImpl parent = null;
+       
+        public RecoveryThread(McastServiceImpl parent) {
+            this.parent = parent;
+            if (!init(this)) parent = null;
+        }
+        
+        public static synchronized boolean init(RecoveryThread t) {
+            if ( running ) {
+                return false;
+            }
+            if ( !t.parent.isRecoveryEnabled()) {
+                return false;
+            }
+            running = true;
+            t.setName("Cluster-MembershipRecovery");
+            t.setDaemon(true);
+            t.start();
+            return true;
+        }
+
+        public boolean stopService() {
+            try {
+                parent.stop();
+                return true;
+            } catch (Exception x) {
+                log.warn("Recovery thread failed to stop membership service.", 
x);
+                return false;
+            }
         }
-    }//class SenderThread
+        
+        public boolean startService() {
+            try {
+                parent.init();
+                parent.start(1);
+                parent.start(2);
+                return true;
+            } catch (Exception x) {
+                log.warn("Recovery thread failed to start membership 
service.", x);
+                return false;
+            }
+        }
+        
+        public void run() {
+            boolean success = false;
+            int attempt = 0;
+            try {
+                while (!success) {
+                    if(log.isInfoEnabled())
+                        log.info("Cluster membership, running recovery thread, 
multicasting is not functional.");
+                    success = stopService();
+                    if(success) {
+                        try {
+                            Thread.sleep(1000 + parent.mcastSoTimeout);
+                        } catch (Exception ignore){}
+                        success = startService();
+                        if(success && log.isInfoEnabled())
+                            log.info("Membership recovery was successful.");
+                    }
+                    try {
+                        if (!success) {
+                            if(log.isInfoEnabled())
+                                log.info("Recovery attempt " + (++attempt) + " 
failed, trying again in " +parent.recoverySleepTime + " milliseconds");
+                            Thread.sleep(parent.recoverySleepTime);
+                            // check member expire...
+                            parent.checkExpire() ;
+                       }
+                    }catch (InterruptedException ignore) {
+                    }
+                }
+            } finally {
+                running = false;
+            }
+        }
+    }
 }

Modified: 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
 (original)
+++ 
tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml
 Sun Aug 19 14:04:15 2007
@@ -53,6 +53,16 @@
     <attribute   name="mcastTTL"
           description=""
                  type="int"/>
+    <attribute   name="recoveryCounter"
+          description="Counter after membership failure socket restarted 
(default 10)"
+                 type="int"/>
+    <attribute   name="recoveryEnabled"
+          description="Membership recovery enabled (default true)"
+                   is="true"
+                 type="boolean"/>
+    <attribute   name="recoverySleepTime"
+          description="Sleep time between next socket recovery (5000 msec)"
+                 type="long"/> 
     <attribute   name="localMemberName"
           description="Complete local receiver information"
                  type="java.lang.String"

Modified: tomcat/container/tc5.5.x/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/changelog.xml?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/webapps/docs/changelog.xml (original)
+++ tomcat/container/tc5.5.x/webapps/docs/changelog.xml Sun Aug 19 14:04:15 2007
@@ -135,6 +135,9 @@
   <subsection name="Cluster">
    <changelog>
      <fix>
+        <bug>40042</bug>: Recovery membership heartbeat after interface down. 
(pero)
+     </fix>
+     <fix>
         <bug>42691</bug>: Don't set access time after session sync. Fix that 
sessions
         after node restart better expire. Requested by Casey Lucas (pero)
      </fix>   

Modified: tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml?rev=567470&r1=567469&r2=567470&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml (original)
+++ tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml Sun Aug 19 14:04:15 
2007
@@ -497,13 +497,16 @@
                                  mcastClusterDomain="d10" 
                                           mcastPort="45564"
                                      mcastFrequency="1000"
-                                      mcastDropTime="30000"/&gt;
+                                      mcastDropTime="30000"
+                                                                       
recoveryCounter="10"
+                                    recoveryEnabled="true"
+                                  recoverySleepTime="5000"/&gt;         
                   &lt;Receiver 
                                            
className="org.apache.catalina.cluster.tcp.ReplicationListener"
                                     tcpListenAddress="auto"
                                        tcpListenPort="9015"
                                   tcpSelectorTimeout="100"
-                                      tcpThreadCount="6"
+                                      tcpThreadCount="6"/&gt;
                   &lt;Sender
                                            
className="org.apache.catalina.cluster.tcp.ReplicationTransmitter"
                                      replicationMode="fastasyncqueue"



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to