Author: fhanik Date: Fri Apr 13 16:26:07 2007 New Revision: 528702 URL: http://svn.apache.org/viewvc?view=rev&rev=528702 Log: Added a TCP ping for membership, to be used with static memberships and with the TCP failure detector
Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?view=diff&rev=528702&r1=528701&r2=528702 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Fri Apr 13 16:26:07 2007 @@ -180,53 +180,80 @@ } public void heartbeat() { + checkMembers(false); + } + public void checkMembers(boolean checkAll) { + try { if (membership == null) setupMembership(); synchronized (membership) { - //update all alive times - Member[] members = super.getMembers(); - for (int i = 0; members != null && i < members.length; i++) { - if (membership.memberAlive( (MemberImpl) members[i])) { - //we don't have this one in our membership, check to see if he/she is alive - if (memberAlive(members[i])) { - log.warn("Member added, even though we werent notified:" + members[i]); - super.memberAdded(members[i]); - } else { - membership.removeMember( (MemberImpl) members[i]); - } //end if - } //end if - } //for - - //check suspect members if they are still alive, - //if not, simply issue the memberDisappeared message - MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]); - for (int i = 0; i < keys.length; i++) { - MemberImpl m = (MemberImpl) keys[i]; - if (membership.getMember(m) != null && (!memberAlive(m))) { - membership.removeMember(m); - super.memberDisappeared(m); - removeSuspects.remove(m); - log.info("Suspect member, confirmed dead.["+m+"]"); - } //end if - } - - //check add suspects members if they are alive now, - //if they are, simply issue the memberAdded message - keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]); - for (int i = 0; i < keys.length; i++) { - MemberImpl m = (MemberImpl) keys[i]; - if ( membership.getMember(m) == null && (memberAlive(m))) { - membership.memberAlive(m); - super.memberAdded(m); - addSuspects.remove(m); - log.info("Suspect member, confirmed alive.["+m+"]"); - } //end if - } + if ( !checkAll ) performBasicCheck(); + else performForcedCheck(); } }catch ( Exception x ) { log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x); } finally { super.heartbeat(); + } + } + + protected void performForcedCheck() { + //update all alive times + Member[] members = super.getMembers(); + for (int i = 0; members != null && i < members.length; i++) { + if (memberAlive(members[i])) { + if (membership.memberAlive((MemberImpl)members[i])) super.memberAdded(members[i]); + addSuspects.remove(members[i]); + } else { + if (membership.getMember(members[i])!=null) { + membership.removeMember((MemberImpl)members[i]); + removeSuspects.remove(members[i]); + super.memberDisappeared((MemberImpl)members[i]); + } + } //end if + } //for + + } + + protected void performBasicCheck() { + //update all alive times + Member[] members = super.getMembers(); + for (int i = 0; members != null && i < members.length; i++) { + if (membership.memberAlive( (MemberImpl) members[i])) { + //we don't have this one in our membership, check to see if he/she is alive + if (memberAlive(members[i])) { + log.warn("Member added, even though we werent notified:" + members[i]); + super.memberAdded(members[i]); + } else { + membership.removeMember( (MemberImpl) members[i]); + } //end if + } //end if + } //for + + //check suspect members if they are still alive, + //if not, simply issue the memberDisappeared message + MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]); + for (int i = 0; i < keys.length; i++) { + MemberImpl m = (MemberImpl) keys[i]; + if (membership.getMember(m) != null && (!memberAlive(m))) { + membership.removeMember(m); + super.memberDisappeared(m); + removeSuspects.remove(m); + log.info("Suspect member, confirmed dead.["+m+"]"); + } //end if + } + + //check add suspects members if they are alive now, + //if they are, simply issue the memberAdded message + keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]); + for (int i = 0; i < keys.length; i++) { + MemberImpl m = (MemberImpl) keys[i]; + if ( membership.getMember(m) == null && (memberAlive(m))) { + membership.memberAlive(m); + super.memberAdded(m); + addSuspects.remove(m); + log.info("Suspect member, confirmed alive.["+m+"]"); + } //end if } } Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java?view=auto&rev=528702 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java Fri Apr 13 16:26:07 2007 @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.group.interceptors; + +import java.lang.ref.WeakReference; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelInterceptor; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.io.ChannelData; + +/** + * + * Sends a ping to all members. + * Configure this interceptor with the TcpFailureDetector below it, + * and the TcpFailureDetector will act as the membership guide. + * @author Filip Hanik + * @version 1.0 + */ + +public class TcpPingInterceptor extends ChannelInterceptorBase { + + protected static org.apache.juli.logging.Log log = + org.apache.juli.logging.LogFactory.getLog(TcpPingInterceptor.class); + + protected static byte[] TCP_PING_DATA = new byte[] { + 79, -89, 115, 72, 121, -33, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20, + 125, -39, 82, 91, -21, -33, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74, + 55, 21, -66, -121, 69, 33, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50, + 85, -10, -108, -73, 58, -33, 33, 120, -111, 4, 125, -41, 114, -124, -64, -43}; + + protected long interval = 1000; //1 second + + protected boolean useThread = false; + protected boolean staticOnly = false; + protected boolean running = true; + protected PingThread thread = null; + protected static AtomicInteger cnt = new AtomicInteger(0); + + WeakReference<TcpFailureDetector> failureDetector = null; + WeakReference<StaticMembershipInterceptor> staticMembers = null; + + public synchronized void start(int svc) throws ChannelException { + super.start(svc); + running = true; + if ( thread == null ) { + thread = new PingThread(); + thread.setDaemon(true); + thread.setName("TcpPingInterceptor.PingThread-"+cnt.addAndGet(1)); + thread.start(); + } + + //acquire the interceptors to invoke on send ping events + ChannelInterceptor next = getNext(); + while ( next != null ) { + if ( next instanceof TcpFailureDetector ) + failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector)next); + if ( next instanceof StaticMembershipInterceptor ) + staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor)next); + next = next.getNext(); + } + + } + + public void stop(int svc) throws ChannelException { + running = false; + if ( thread != null ) thread.interrupt(); + thread = null; + super.stop(svc); + } + + public void heartbeat() { + super.heartbeat(); + if (!getUseThread()) sendPing(); + } + + public long getInterval() { + return interval; + } + + public void setInterval(long interval) { + this.interval = interval; + } + + public void setUseThread(boolean useThread) { + this.useThread = useThread; + } + + public void setStaticOnly(boolean staticOnly) { + this.staticOnly = staticOnly; + } + + public boolean getUseThread() { + return useThread; + } + + public boolean getStaticOnly() { + return staticOnly; + } + + protected void sendPing() { + if (failureDetector.get()!=null) { + //we have a reference to the failure detector + //piggy back on that dude + failureDetector.get().checkMembers(true); + }else { + if (staticOnly && staticMembers.get()!=null) { + sendPingMessage(staticMembers.get().getMembers()); + } else { + sendPingMessage(getMembers()); + } + } + } + + protected void sendPingMessage(Member[] members) { + if ( members == null || members.length == 0 ) return; + ChannelData data = new ChannelData(true);//generates a unique Id + data.setAddress(getLocalMember(false)); + data.setTimestamp(System.currentTimeMillis()); + data.setOptions(getOptionFlag()); + try { + super.sendMessage(members, data, null); + }catch (ChannelException x) { + log.warn("Unable to send TCP ping.",x); + } + } + + public void messageReceived(ChannelMessage msg) { + //catch incoming + boolean process = true; + if ( okToProcess(msg.getOptions()) ) { + //check to see if it is a ping message, if so, process = false + process = ( (msg.getMessage().getLength() != TCP_PING_DATA.length) || + (!Arrays.equals(TCP_PING_DATA,msg.getMessage().getBytes()) ) ); + }//end if + + //ignore the message, it doesnt have the flag set + if ( process ) super.messageReceived(msg); + else if ( log.isDebugEnabled() ) log.debug("Received a TCP ping packet:"+msg); + }//messageReceived + + protected class PingThread extends Thread { + public void run() { + while (running) { + try { + sleep(interval); + sendPing(); + }catch ( InterruptedException ix ) { + interrupted(); + }catch ( Exception x ) { + log.warn("Unable to send ping from TCP ping thread.",x); + } + } + } + } + + + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]