Author: fhanik Date: Tue May 2 16:46:22 2006 New Revision: 399090 URL: http://svn.apache.org/viewcvs?rev=399090&view=rev Log: Refactored the membership layer to not be tied to multicasting, so that same logic can be used for other implementations Started implemented a TcpFailureDetector, so that we can increase reliability in membership service
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java - copied, changed from r399086, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java Removed: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml?rev=399090&r1=399089&r2=399090&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml (original) +++ tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml Tue May 2 16:46:22 2006 @@ -217,12 +217,13 @@ <p> <b>Threadless Interceptor stack</b> The interceptor don't require any separate threads to perform their message manipulation.<br/> - Messages are sent will piggy back on the thread that is sending them all the way through transmission. + Messages that are sent will piggy back on the thread that is sending them all the way through transmission. The exception is the <code>MessageDispatchInterceptor</code> that will queue up the message and send it on a separate thread for asynchronous message delivery. Messages received are controlled by a thread pool in the <code>receiver</code> component.<br/> The channel object can send a <code>heartbeat()</code> through the interceptor stack to allow - for timeouts, cleanup and other events. + for timeouts, cleanup and other events.<br/> + The <code>MessageDispatchInterceptor</code> is the only interceptor that is configured by default. </p> <p> <b>Parallel Delivery</b><br/> Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=399090&r1=399089&r2=399090&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java Tue May 2 16:46:22 2006 @@ -26,8 +26,7 @@ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */ -public abstract class ChannelInterceptorBase - implements ChannelInterceptor { +public abstract class ChannelInterceptorBase implements ChannelInterceptor { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( ChannelInterceptorBase.class); Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=399090&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Tue May 2 16:46:22 2006 @@ -0,0 +1,186 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ +package org.apache.catalina.tribes.group.interceptors; + +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.group.InterceptorPayload; +import org.apache.catalina.tribes.ChannelException; +import java.net.Socket; +import java.net.InetSocketAddress; +import java.net.InetAddress; +import org.apache.catalina.tribes.Channel; +import java.util.Arrays; +import java.net.SocketTimeoutException; +import org.apache.catalina.tribes.io.ClusterData; +import org.apache.catalina.tribes.io.XByteBuffer; +import java.util.HashMap; +import org.apache.catalina.tribes.membership.Membership; +import org.apache.catalina.tribes.membership.MemberImpl; + +/** + * <p>Title: A perfect failure detector </p> + * + * <p>Description: The TcpFailureDetector is a useful interceptor + * that adds reliability to the membership layer.</p> + * <p> + * If the network is busy, or the system is busy so that the membership receiver thread + * is not getting enough time to update its table, members can be "timed out" + * This failure detector will intercept the memberDisappeared message(unless its a true shutdown message) + * and connect to the member using TCP. + * + * NOT YET COMPLETE + * + * </p> + * + * @author Filip Hanik + * @version 1.0 + */ +public class TcpFailureDetector extends ChannelInterceptorBase { + + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class ); + + protected static byte[] testMessage = new byte[] { + 79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20, + 125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74, + 55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50, + 85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, -64, -43}; + + protected boolean performConnectTest = true; + + protected long connectTimeout = 1000;//1 second default + + protected boolean performSendTest = true; + + protected boolean performReadTest = false; + + protected long readTestTimeout = 5000;//5 seconds + + protected Membership membership = null; + + protected HashMap suspect = new HashMap(); + + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + super.sendMessage(destination,msg,payload); + } + + public void messageReceived(ChannelMessage msg) { + //catch incoming + boolean process = true; + if ( okToProcess(msg.getOptions()) ) { + //check to see if it is a testMessage, if so, process = false + process = ( (msg.getMessage().getLength() != testMessage.length) || + (!Arrays.equals(testMessage,msg.getMessage().getBytes()) ) ); + }//end if + + //ignore the message, it doesnt have the flag set + if ( process ) super.messageReceived(msg); + else if ( log.isInfoEnabled() ) log.info("Received a failure detector packet:"+msg); + }//messageReceived + + + public synchronized void memberAdded(Member member) { + if ( membership == null ) setupMembership(); + if ( suspect.containsKey(member) ) suspect.remove(member); + else { + //not correct, this could make the membership out of sync + membership.addMember((MemberImpl)member); + super.memberAdded(member); + } + } + + public synchronized void memberDisappeared(Member member) { + if ( membership == null ) setupMembership(); + //check to see if the member really is gone + //if the payload is not a shutdown message + if ( !memberAlive(member) ) { + //not correct, we need to maintain the map + membership.removeMember((MemberImpl)member); + super.memberDisappeared(member); + } else suspect.put(member,new Long(System.currentTimeMillis())); + } + + public boolean hasMembers() { + return super.hasMembers(); + } + + public Member[] getMembers() { + if ( membership == null ) setupMembership(); + return membership.getMembers(); + } + + public Member getMember(Member mbr) { + return super.getMember(mbr); + } + + public Member getLocalMember(boolean incAlive) { + return super.getLocalMember(incAlive); + } + + public void heartbeat() { + //todo, implement an expiration of members that we deemed alive + //check them again and act accordingly + + super.heartbeat(); + } + + protected synchronized void setupMembership() { + if ( membership == null ) { + membership = new Membership((MemberImpl)super.getLocalMember(true)); + } + + } + + + protected boolean memberAlive(Member mbr) { + if ( Arrays.equals(mbr.getPayload(),Member.SHUTDOWN_PAYLOAD) ) return false; + + Socket socket = new Socket(); + try { + InetAddress ia = InetAddress.getByAddress(mbr.getHost()); + InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort()); + socket.setSoTimeout((int)readTestTimeout); + socket.connect(addr, (int) connectTimeout); + if ( performSendTest ) { + ClusterData data = new ClusterData(true); + data.setAddress(mbr); + data.setMessage(new XByteBuffer(testMessage,false)); + data.setTimestamp(System.currentTimeMillis()); + int options = getOptionFlag() | Channel.SEND_OPTIONS_BYTE_MESSAGE; + if ( performReadTest ) options = (options | Channel.SEND_OPTIONS_USE_ACK); + data.setOptions(options); + byte[] message = XByteBuffer.createDataPackage(data); + socket.getOutputStream().write(message); + if ( performReadTest ) { + int length = socket.getInputStream().read(message); + return length > 0; + } + }//end if + return true; + } catch ( SocketTimeoutException sx) { + //do nothing, we couldn't connect + }catch (Exception x ) { + log.error("Unable to perform failure detection check, assuming member down.",x); + } finally { + try {socket.close(); } catch ( Exception ignore ){} + } + return false; + } + + + + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=399090&r1=399089&r2=399090&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Tue May 2 16:46:22 2006 @@ -22,6 +22,7 @@ import org.apache.catalina.tribes.membership.MemberImpl; import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.catalina.tribes.Channel; +import java.sql.Timestamp; /** * The cluster data class is used to transport around the byte array from @@ -243,5 +244,23 @@ return ( (Channel.SEND_OPTIONS_USE_ACK & options) == Channel.SEND_OPTIONS_USE_ACK) && ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); } + + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("ClusterData[src="); + buf.append(getAddress()).append("; id="); + buf.append(bToS(getUniqueId())).append("; sent="); + buf.append(new Timestamp(this.getTimestamp()).toString()).append("]"); + return buf.toString(); + } + + public static String bToS(byte[] data) { + StringBuffer buf = new StringBuffer(4*16); + buf.append("{"); + for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" "); + buf.append("}"); + return buf.toString(); + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=399090&r1=399089&r2=399090&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Tue May 2 16:46:22 2006 @@ -81,7 +81,7 @@ /** * The membership, used so that we calculate memberships when they arrive or don't arrive */ - protected McastMembership membership; + protected Membership membership; /** * The actual listener, for callback when shits goes down */ @@ -148,7 +148,7 @@ receivePacket = new DatagramPacket(new byte[1024],1024); receivePacket.setAddress(address); receivePacket.setPort(port); - membership = new McastMembership(member); + membership = new Membership(member); timeToExpiration = expireTime; this.service = service; this.sendFrequency = sendFrequency; @@ -239,7 +239,7 @@ if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) { if (log.isDebugEnabled()) log.debug("Member has shutdown:" + m); - membership.removeMcastMember(m); + membership.removeMember(m); service.memberDisappeared(m); } else if (membership.memberAlive(m)) { if (log.isDebugEnabled()) Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=399090&r1=399089&r2=399090&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Tue May 2 16:46:22 2006 @@ -391,7 +391,7 @@ return buf.toString(); } - protected static String bToS(byte[] data) { + public static String bToS(byte[] data) { StringBuffer buf = new StringBuffer(4*16); buf.append("{"); for (int i=0; data!=null && i<data.length; i++ ) buf.append(String.valueOf(data[i])).append(" "); Copied: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java (from r399086, tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java) URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java?p2=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java&p1=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java&r1=399086&r2=399090&rev=399090&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java Tue May 2 16:46:22 2006 @@ -35,7 +35,7 @@ * @author Peter Rossbach * @version $Revision: 356540 $, $Date: 2005-12-13 10:53:40 -0600 (Tue, 13 Dec 2005) $ */ -public class McastMembership +public class Membership { protected static final MemberImpl[] EMPTY_MEMBERS = new MemberImpl[0]; @@ -64,7 +64,7 @@ * Constructs a new membership * @param name - has to be the name of the local member. Used to filter the local member from the cluster membership */ - public McastMembership(MemberImpl local) { + public Membership(MemberImpl local) { this.local = local; } @@ -94,7 +94,7 @@ if ( entry == null ) { entry = new MbrEntry(member); map.put(member,entry); - addMcastMember(member); + addMember(member); result = true; } else { //update the member alive time @@ -113,7 +113,7 @@ * Add a member to this component and sort array with memberComparator * @param member The member to add */ - protected void addMcastMember(MemberImpl member) { + public void addMember(MemberImpl member) { synchronized (members) { MemberImpl results[] = new MemberImpl[members.length + 1]; @@ -130,7 +130,7 @@ * * @param member The member to remove */ - protected void removeMcastMember(MemberImpl member) { + public void removeMember(MemberImpl member) { map.remove(member); synchronized (members) { int n = -1; @@ -179,7 +179,7 @@ MemberImpl[] result = new MemberImpl[list.size()]; list.toArray(result); for( int j=0; j<result.length; j++) { - removeMcastMember(result[j]); + removeMember(result[j]); } return result; } else { @@ -277,7 +277,7 @@ } /** - * Return the actual McastMember object + * Return the actual Member object */ public MemberImpl getMember() { return mbr; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]