Author: pero Date: Sun Aug 19 14:04:15 2007 New Revision: 567470 URL: http://svn.apache.org/viewvc?rev=567470&view=rev Log: Recovery membership heartbeat after interface down (Fix Bug 40042).
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml tomcat/container/tc5.5.x/webapps/docs/changelog.xml tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java?rev=567470&r1=567469&r2=567470&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastService.java Sun Aug 19 14:04:15 2007 @@ -59,7 +59,7 @@ /** * The descriptive information about this implementation. */ - private static final String info = "McastService/2.1"; + private static final String info = "McastService/2.2"; /** * The implementation specific properties @@ -310,7 +310,16 @@ ttl, soTimeout, this); - + String value = properties.getProperty("recoveryEnabled","true"); + boolean recEnabled = Boolean.valueOf(value).booleanValue() ; + impl.setRecoveryEnabled(recEnabled); + int recCnt = Integer.parseInt(properties.getProperty("recoveryCounter","10")); + impl.setRecoveryCounter(recCnt); + long recSlpTime = Long.parseLong(properties.getProperty("recoverySleepTime","5000")); + impl.setRecoverySleepTime(recSlpTime); + if(log.isDebugEnabled()) + log.debug("Recovery Options (enabled=" + recEnabled + ",counter=" +recCnt+ ",time=" +recSlpTime+")."); + impl.start(level); long memberwait = (Long.parseLong(properties.getProperty("msgFrequency"))*4); if(log.isInfoEnabled()) @@ -479,6 +488,36 @@ properties.setProperty("mcastTTL", String.valueOf(mcastTTL)); } + public int getRecoveryCounter() { + if(impl != null) + return impl.getRecoveryCounter() ; + else return Integer.parseInt(properties.getProperty("recoveryCounter","10")); + } + + public boolean isRecoveryEnabled() { + if(impl != null) + return impl.isRecoveryEnabled() ; + else return Boolean.getBoolean(properties.getProperty("recoveryEnabled","true")); + } + + public long getRecoverySleepTime() { + if(impl != null) + return impl.getRecoverySleepTime() ; + else return Long.parseLong(properties.getProperty("recoverySleepTime","5000")); + } + + public void setRecoveryCounter(int recoveryCounter) { + properties.setProperty("recoveryCounter", String.valueOf(recoveryCounter)); + } + + public void setRecoveryEnabled(boolean recoveryEnabled) { + properties.setProperty("recoveryEnabled", String.valueOf(recoveryEnabled)); + } + + public void setRecoverySleepTime(long recoverySleepTime) { + properties.setProperty("recoverySleepTime", String.valueOf(recoverySleepTime)); + } + /** * Simple test program * @param args Command-line arguments @@ -501,4 +540,7 @@ service.start(); Thread.sleep(60*1000*60); } + + + } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java?rev=567470&r1=567469&r2=567470&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/McastServiceImpl.java Sun Aug 19 14:04:15 2007 @@ -22,6 +22,8 @@ import java.io.IOException; import java.net.InetAddress ; import java.net.DatagramPacket; +import java.net.SocketTimeoutException; + import org.apache.catalina.cluster.MembershipListener; /** @@ -33,6 +35,7 @@ * Need to fix this, could use java.nio and only need one thread to send and receive, or * just use a timeout on the receive * @author Filip Hanik + * @author Peter Rossbach * @version $Revision$, $Date$ */ public class McastServiceImpl @@ -102,6 +105,21 @@ protected InetAddress mcastBindAddress = null; /** + * nr of times the system has to fail before a recovery is initiated + */ + protected int recoveryCounter = 10; + + /** + * The time the recovery thread sleeps between recovery attempts + */ + protected long recoverySleepTime = 5000; + + /** + * Add the ability to turn on/off recovery + */ + protected boolean recoveryEnabled = true; + + /** * Create a new mcast service impl * @param member - the local member * @param sendFrequency - the time (ms) in between pings sent out @@ -129,6 +147,13 @@ this.mcastSoTimeout = soTimeout; this.mcastTTL = ttl; this.mcastBindAddress = bind; + timeToExpiration = expireTime; + this.service = service; + this.sendFrequency = sendFrequency; + init(); + } + + protected void init() throws IOException { setupSocket(); sendPacket = new DatagramPacket(new byte[1000],1000); sendPacket.setAddress(address); @@ -136,27 +161,25 @@ receivePacket = new DatagramPacket(new byte[1000],1000); receivePacket.setAddress(address); receivePacket.setPort(port); - membership = new McastMembership(member.getName()); - timeToExpiration = expireTime; - this.service = service; - this.sendFrequency = sendFrequency; + if(membership == null) membership = new McastMembership(member.getName()); } protected void setupSocket() throws IOException { if (mcastBindAddress != null) socket = new MulticastSocket(new java.net. InetSocketAddress(mcastBindAddress, port)); else socket = new MulticastSocket(port); + socket.setLoopbackMode(false); //hint that we don't need loop back messages if (mcastBindAddress != null) { if(log.isInfoEnabled()) log.info("Setting multihome multicast interface to:" + mcastBindAddress); socket.setInterface(mcastBindAddress); } //end if - if ( mcastSoTimeout >= 0 ) { - if(log.isInfoEnabled()) - log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout); - socket.setSoTimeout(mcastSoTimeout); - } + //force a so timeout so that we don't block forever + if ( mcastSoTimeout <= 0 ) mcastSoTimeout = (int)sendFrequency; + if(log.isInfoEnabled()) + log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout); + socket.setSoTimeout(mcastSoTimeout); if ( mcastTTL >= 0 ) { if(log.isInfoEnabled()) log.info("Setting cluster mcast TTL to " + mcastTTL); @@ -193,11 +216,17 @@ * @throws IOException if the service fails to disconnect from the sockets */ public synchronized void stop() throws IOException { - socket.leaveGroup(address); - doRun = false; - sender = null; - receiver = null; - serviceStartTime = Long.MAX_VALUE; + try { + socket.leaveGroup(address); + } catch (IOException ignore) { + } finally { + doRun = false; + if(sender!= null) sender.interrupt() ; + sender = null; + if(receiver!= null) receiver.interrupt() ; + receiver = null; + serviceStartTime = Long.MAX_VALUE; + } } /** @@ -205,22 +234,37 @@ * @throws IOException */ public void receive() throws IOException { - socket.receive(receivePacket); - byte[] data = new byte[receivePacket.getLength()]; - System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length); - McastMember m = McastMember.getMember(data); - if(log.isDebugEnabled()) - log.debug("Mcast receive ping from member " + m); - if ( membership.memberAlive(m) ) { + try { + socket.receive(receivePacket); + + byte[] data = new byte[receivePacket.getLength()]; + System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length); + McastMember m = McastMember.getMember(data); if(log.isDebugEnabled()) - log.debug("Mcast add member " + m); - service.memberAdded(m); + log.debug("Mcast receive ping from member " + m); + if ( membership.memberAlive(m) ) { + if(log.isDebugEnabled()) + log.debug("Mcast add member " + m); + service.memberAdded(m); + } + } finally { + checkExpire(); } - McastMember[] expired = membership.expire(timeToExpiration); - for ( int i=0; i<expired.length; i++) { - if(log.isDebugEnabled()) - log.debug("Mcast exipre member " + m); - service.memberDisappeared(expired[i]); + } + + protected Object expiredMutex = new Object(); + + /** + * check member exipre or alive + */ + protected void checkExpire() { + synchronized (expiredMutex) { + McastMember[] expired = membership.expire(timeToExpiration); + for ( int i=0; i<expired.length; i++) { + if(log.isDebugEnabled()) + log.debug("Mcast exipre member " + expired[i]); + service.memberDisappeared(expired[i]); + } } } @@ -229,55 +273,198 @@ * @throws Exception */ public void send() throws Exception{ - member.inc(); - if(log.isDebugEnabled()) - log.debug("Mcast send ping from member " + member); - byte[] data = member.getData(this.serviceStartTime); - DatagramPacket p = new DatagramPacket(data,data.length); - p.setAddress(address); - p.setPort(port); - socket.send(p); + try { + member.inc(); + + if(log.isDebugEnabled()) + log.debug("Mcast send ping from member " + member); + byte[] data = member.getData(this.serviceStartTime); + DatagramPacket p = new DatagramPacket(data,data.length); + p.setAddress(address); + p.setPort(port); + socket.send(p); + } finally { + checkExpire() ; + } } public long getServiceStartTime() { return this.serviceStartTime; } + public int getRecoveryCounter() { + return recoveryCounter; + } + + public boolean isRecoveryEnabled() { + return recoveryEnabled; + } + + public long getRecoverySleepTime() { + return recoverySleepTime; + } + public void setRecoveryCounter(int recoveryCounter) { + this.recoveryCounter = recoveryCounter; + } + + public void setRecoveryEnabled(boolean recoveryEnabled) { + this.recoveryEnabled = recoveryEnabled; + } + + public void setRecoverySleepTime(long recoverySleepTime) { + this.recoverySleepTime = recoverySleepTime; + } + public class ReceiverThread extends Thread { + public ReceiverThread() { super(); setName("Cluster-MembershipReceiver"); } + public void run() { + int errorCounter = 0 ; while ( doRun ) { try { receive(); + errorCounter = 0; } catch ( Exception x ) { - log.warn("Error receiving mcast package. Sleeping 500ms",x); - try { Thread.sleep(500); } catch ( Exception ignore ){} - + if (errorCounter==0) { + if(! (x instanceof SocketTimeoutException)) + log.warn("Error receiving mcast package (errorCounter=" +errorCounter+ "). Sleeping " +sendFrequency + " ms",x); + } else { + if(! (x instanceof SocketTimeoutException) + && log.isDebugEnabled()) + log.debug("Error receiving mcast package (errorCounter=" +errorCounter+ "). Sleeping " +sendFrequency+ " ms",x); + } + try { Thread.sleep(sendFrequency); } catch ( Exception ignore ){} + if ( (++errorCounter)>=recoveryCounter ) { + log.warn("Error receiving mcast package (errorCounter=" +errorCounter+ "). Try Recovery!",x); + errorCounter=0; + new RecoveryThread(McastServiceImpl.this); + } } } + log.warn("Receiver Thread ends with errorCounter=" +errorCounter+ "."); + } - }//class ReceiverThread + } public class SenderThread extends Thread { + long time; + + McastServiceImpl service ; + public SenderThread(long time) { this.time = time; setName("Cluster-MembershipSender"); } + public void run() { + int errorCounter = 0 ; while ( doRun ) { try { send(); + errorCounter = 0; } catch ( Exception x ) { - log.warn("Unable to send mcast message.",x); + if (errorCounter==0) { + log.warn("Unable to send mcast message.",x); + } + else { + if(log.isDebugEnabled()) + log.debug("Unable to send mcast message.",x); + } + if ( (++errorCounter)>=recoveryCounter ) { + errorCounter=0; + new RecoveryThread(McastServiceImpl.this); + } } try { Thread.sleep(time); } catch ( Exception ignore ) {} } + log.warn("Sender Thread ends with errorCounter=" +errorCounter+ "."); + } + } + + protected static class RecoveryThread extends Thread { + + static boolean running = false; + + McastServiceImpl parent = null; + + public RecoveryThread(McastServiceImpl parent) { + this.parent = parent; + if (!init(this)) parent = null; + } + + public static synchronized boolean init(RecoveryThread t) { + if ( running ) { + return false; + } + if ( !t.parent.isRecoveryEnabled()) { + return false; + } + running = true; + t.setName("Cluster-MembershipRecovery"); + t.setDaemon(true); + t.start(); + return true; + } + + public boolean stopService() { + try { + parent.stop(); + return true; + } catch (Exception x) { + log.warn("Recovery thread failed to stop membership service.", x); + return false; + } } - }//class SenderThread + + public boolean startService() { + try { + parent.init(); + parent.start(1); + parent.start(2); + return true; + } catch (Exception x) { + log.warn("Recovery thread failed to start membership service.", x); + return false; + } + } + + public void run() { + boolean success = false; + int attempt = 0; + try { + while (!success) { + if(log.isInfoEnabled()) + log.info("Cluster membership, running recovery thread, multicasting is not functional."); + success = stopService(); + if(success) { + try { + Thread.sleep(1000 + parent.mcastSoTimeout); + } catch (Exception ignore){} + success = startService(); + if(success && log.isInfoEnabled()) + log.info("Membership recovery was successful."); + } + try { + if (!success) { + if(log.isInfoEnabled()) + log.info("Recovery attempt " + (++attempt) + " failed, trying again in " +parent.recoverySleepTime + " milliseconds"); + Thread.sleep(parent.recoverySleepTime); + // check member expire... + parent.checkExpire() ; + } + }catch (InterruptedException ignore) { + } + } + } finally { + running = false; + } + } + } } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml?rev=567470&r1=567469&r2=567470&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/mcast/mbeans-descriptors.xml Sun Aug 19 14:04:15 2007 @@ -53,6 +53,16 @@ <attribute name="mcastTTL" description="" type="int"/> + <attribute name="recoveryCounter" + description="Counter after membership failure socket restarted (default 10)" + type="int"/> + <attribute name="recoveryEnabled" + description="Membership recovery enabled (default true)" + is="true" + type="boolean"/> + <attribute name="recoverySleepTime" + description="Sleep time between next socket recovery (5000 msec)" + type="long"/> <attribute name="localMemberName" description="Complete local receiver information" type="java.lang.String" Modified: tomcat/container/tc5.5.x/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/changelog.xml?rev=567470&r1=567469&r2=567470&view=diff ============================================================================== --- tomcat/container/tc5.5.x/webapps/docs/changelog.xml (original) +++ tomcat/container/tc5.5.x/webapps/docs/changelog.xml Sun Aug 19 14:04:15 2007 @@ -135,6 +135,9 @@ <subsection name="Cluster"> <changelog> <fix> + <bug>40042</bug>: Recovery membership heartbeat after interface down. (pero) + </fix> + <fix> <bug>42691</bug>: Don't set access time after session sync. Fix that sessions after node restart better expire. Requested by Casey Lucas (pero) </fix> Modified: tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml?rev=567470&r1=567469&r2=567470&view=diff ============================================================================== --- tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml (original) +++ tomcat/container/tc5.5.x/webapps/docs/cluster-howto.xml Sun Aug 19 14:04:15 2007 @@ -497,13 +497,16 @@ mcastClusterDomain="d10" mcastPort="45564" mcastFrequency="1000" - mcastDropTime="30000"/> + mcastDropTime="30000" + recoveryCounter="10" + recoveryEnabled="true" + recoverySleepTime="5000"/> <Receiver className="org.apache.catalina.cluster.tcp.ReplicationListener" tcpListenAddress="auto" tcpListenPort="9015" tcpSelectorTimeout="100" - tcpThreadCount="6" + tcpThreadCount="6"/> <Sender className="org.apache.catalina.cluster.tcp.ReplicationTransmitter" replicationMode="fastasyncqueue" --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]