Author: fhanik
Date: Tue May  2 16:46:22 2006
New Revision: 399090

URL: http://svn.apache.org/viewcvs?rev=399090&view=rev
Log:
Refactored the membership layer to not be tied to multicasting, so that same 
logic can be used for other implementations
Started implemented a TcpFailureDetector, so that we can increase reliability 
in membership service

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java
      - copied, changed from r399086, 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
Removed:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml Tue May  2 
16:46:22 2006
@@ -217,12 +217,13 @@
   <p>
     <b>Threadless Interceptor stack</b>
     The interceptor don't require any separate threads to perform their 
message manipulation.<br/>
-    Messages are sent will piggy back on the thread that is sending them all 
the way through transmission.
+    Messages that are sent will piggy back on the thread that is sending them 
all the way through transmission.
     The exception is the <code>MessageDispatchInterceptor</code> that will 
queue up the message
     and send it on a separate thread for asynchronous message delivery.
     Messages received are controlled by a thread pool in the 
<code>receiver</code> component.<br/>
     The channel object can send a <code>heartbeat()</code> through the 
interceptor stack to allow 
-    for timeouts, cleanup and other events.
+    for timeouts, cleanup and other events.<br/>
+    The <code>MessageDispatchInterceptor</code> is the only interceptor that 
is configured by default.
   </p>
   <p>
     <b>Parallel Delivery</b><br/>

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
 Tue May  2 16:46:22 2006
@@ -26,8 +26,7 @@
  * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
  */
 
-public abstract class ChannelInterceptorBase
-    implements ChannelInterceptor {
+public abstract class ChannelInterceptorBase implements ChannelInterceptor {
 
     protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(
         ChannelInterceptorBase.class);

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=399090&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
 Tue May  2 16:46:22 2006
@@ -0,0 +1,186 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.ChannelException;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import org.apache.catalina.tribes.Channel;
+import java.util.Arrays;
+import java.net.SocketTimeoutException;
+import org.apache.catalina.tribes.io.ClusterData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.HashMap;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.membership.MemberImpl;
+
+/**
+ * <p>Title: A perfect failure detector </p>
+ *
+ * <p>Description: The TcpFailureDetector is a useful interceptor
+ * that adds reliability to the membership layer.</p>
+ * <p>
+ * If the network is busy, or the system is busy so that the membership 
receiver thread
+ * is not getting enough time to update its table, members can be &quot;timed 
out&quot;
+ * This failure detector will intercept the memberDisappeared message(unless 
its a true shutdown message)
+ * and connect to the member using TCP.
+ * 
+ * NOT YET COMPLETE
+ *    
+ * </p>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class TcpFailureDetector extends ChannelInterceptorBase {
+    
+    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
+    
+    protected static byte[] testMessage = new byte[] {
+        79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91, 
7, 20,
+        125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103, 
30, -74,
+        55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
+        85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124, 
-64, -43};      
+    
+    protected boolean performConnectTest = true;
+
+    protected long connectTimeout = 1000;//1 second default
+    
+    protected boolean performSendTest = true;
+
+    protected boolean performReadTest = false;
+    
+    protected long readTestTimeout = 5000;//5 seconds
+    
+    protected Membership membership = null;
+    
+    protected HashMap suspect = new HashMap();
+    
+    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
+        super.sendMessage(destination,msg,payload);
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        //catch incoming 
+        boolean process = true;
+        if ( okToProcess(msg.getOptions()) ) {
+            //check to see if it is a testMessage, if so, process = false
+            process = ( (msg.getMessage().getLength() != testMessage.length) ||
+                        
(!Arrays.equals(testMessage,msg.getMessage().getBytes()) ) );
+        }//end if
+            
+        //ignore the message, it doesnt have the flag set
+        if ( process ) super.messageReceived(msg);
+        else if ( log.isInfoEnabled() ) log.info("Received a failure detector 
packet:"+msg);
+    }//messageReceived
+    
+    
+    public synchronized void memberAdded(Member member) {
+        if ( membership == null ) setupMembership();
+        if ( suspect.containsKey(member) ) suspect.remove(member);
+        else {
+            //not correct, this could make the membership out of sync
+            membership.addMember((MemberImpl)member);
+            super.memberAdded(member);
+        }
+    }
+
+    public synchronized void memberDisappeared(Member member) {
+        if ( membership == null ) setupMembership();
+        //check to see if the member really is gone
+        //if the payload is not a shutdown message
+        if ( !memberAlive(member) ) {
+            //not correct, we need to maintain the map
+            membership.removeMember((MemberImpl)member);
+            super.memberDisappeared(member);
+        } else suspect.put(member,new Long(System.currentTimeMillis()));
+    }
+    
+    public boolean hasMembers() {
+        return super.hasMembers();
+    }
+
+    public Member[] getMembers() {
+        if ( membership == null ) setupMembership();
+        return membership.getMembers();
+    }
+
+    public Member getMember(Member mbr) {
+        return super.getMember(mbr);
+    }
+
+    public Member getLocalMember(boolean incAlive) {
+        return super.getLocalMember(incAlive);
+    }
+    
+    public void heartbeat() {
+        //todo, implement an expiration of members that we deemed alive
+        //check them again and act accordingly
+        
+        super.heartbeat();
+    }
+    
+    protected synchronized void setupMembership() {
+        if ( membership == null ) {
+            membership = new 
Membership((MemberImpl)super.getLocalMember(true));
+        }
+        
+    }
+    
+    
+    protected boolean memberAlive(Member mbr) {
+        if ( Arrays.equals(mbr.getPayload(),Member.SHUTDOWN_PAYLOAD) ) return 
false;
+        
+        Socket socket = new Socket();        
+        try {
+            InetAddress ia = InetAddress.getByAddress(mbr.getHost());
+            InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
+            socket.setSoTimeout((int)readTestTimeout);
+            socket.connect(addr, (int) connectTimeout);
+            if ( performSendTest ) {
+                ClusterData data = new ClusterData(true);
+                data.setAddress(mbr);
+                data.setMessage(new XByteBuffer(testMessage,false));
+                data.setTimestamp(System.currentTimeMillis());
+                int options = getOptionFlag() | 
Channel.SEND_OPTIONS_BYTE_MESSAGE;
+                if ( performReadTest ) options = (options | 
Channel.SEND_OPTIONS_USE_ACK);
+                data.setOptions(options);
+                byte[] message = XByteBuffer.createDataPackage(data);
+                socket.getOutputStream().write(message);
+                if ( performReadTest ) {
+                    int length = socket.getInputStream().read(message);
+                    return length > 0;
+                }
+            }//end if
+            return true;
+        } catch ( SocketTimeoutException sx) {
+            //do nothing, we couldn't connect
+        }catch (Exception x ) {
+            log.error("Unable to perform failure detection check, assuming 
member down.",x);
+        } finally {
+            try {socket.close(); } catch ( Exception ignore ){}
+        }
+        return false;
+    }
+
+
+
+    
+}
\ No newline at end of file

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
 Tue May  2 16:46:22 2006
@@ -22,6 +22,7 @@
 import org.apache.catalina.tribes.membership.MemberImpl;
 import org.apache.catalina.tribes.util.UUIDGenerator;
 import org.apache.catalina.tribes.Channel;
+import java.sql.Timestamp;
 
 /**
  * The cluster data class is used to transport around the byte array from
@@ -243,5 +244,23 @@
         return ( (Channel.SEND_OPTIONS_USE_ACK & options) == 
Channel.SEND_OPTIONS_USE_ACK) &&
             ( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) != 
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
     }
+    
+    public String toString() {
+        StringBuffer buf = new StringBuffer();
+        buf.append("ClusterData[src=");
+        buf.append(getAddress()).append("; id=");
+        buf.append(bToS(getUniqueId())).append("; sent=");
+        buf.append(new Timestamp(this.getTimestamp()).toString()).append("]");
+        return buf.toString();
+    }
+    
+    public static String bToS(byte[] data) {
+        StringBuffer buf = new StringBuffer(4*16);
+        buf.append("{");
+        for (int i=0; data!=null && i<data.length; i++ ) 
buf.append(String.valueOf(data[i])).append(" ");
+        buf.append("}");
+        return buf.toString();
+    }
+
     
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
 Tue May  2 16:46:22 2006
@@ -81,7 +81,7 @@
     /**
      * The membership, used so that we calculate memberships when they arrive 
or don't arrive
      */
-    protected McastMembership membership;
+    protected Membership membership;
     /**
      * The actual listener, for callback when shits goes down
      */
@@ -148,7 +148,7 @@
         receivePacket = new DatagramPacket(new byte[1024],1024);
         receivePacket.setAddress(address);
         receivePacket.setPort(port);
-        membership = new McastMembership(member);
+        membership = new Membership(member);
         timeToExpiration = expireTime;
         this.service = service;
         this.sendFrequency = sendFrequency;
@@ -239,7 +239,7 @@
 
             if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) {
                 if (log.isDebugEnabled()) log.debug("Member has shutdown:" + 
m);
-                membership.removeMcastMember(m);
+                membership.removeMember(m);
                 service.memberDisappeared(m);
             } else if (membership.memberAlive(m)) {
                 if (log.isDebugEnabled())

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
 Tue May  2 16:46:22 2006
@@ -391,7 +391,7 @@
         return buf.toString();
     }
     
-    protected static String bToS(byte[] data) {
+    public static String bToS(byte[] data) {
         StringBuffer buf = new StringBuffer(4*16);
         buf.append("{");
         for (int i=0; data!=null && i<data.length; i++ ) 
buf.append(String.valueOf(data[i])).append(" ");

Copied: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java
 (from r399086, 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java)
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java?p2=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java&p1=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java&r1=399086&r2=399090&rev=399090&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java
 Tue May  2 16:46:22 2006
@@ -35,7 +35,7 @@
  * @author Peter Rossbach
  * @version $Revision: 356540 $, $Date: 2005-12-13 10:53:40 -0600 (Tue, 13 Dec 
2005) $
  */
-public class McastMembership
+public class Membership
 {
     protected static final MemberImpl[] EMPTY_MEMBERS = new MemberImpl[0];
     
@@ -64,7 +64,7 @@
      * Constructs a new membership
      * @param name - has to be the name of the local member. Used to filter 
the local member from the cluster membership
      */
-    public McastMembership(MemberImpl local) {
+    public Membership(MemberImpl local) {
         this.local = local;
     }
 
@@ -94,7 +94,7 @@
         if ( entry == null ) {
             entry = new MbrEntry(member);
             map.put(member,entry);
-            addMcastMember(member);
+            addMember(member);
             result = true;
        } else {
             //update the member alive time
@@ -113,7 +113,7 @@
      * Add a member to this component and sort array with memberComparator
      * @param member The member to add
      */
-    protected void addMcastMember(MemberImpl member) {
+    public void addMember(MemberImpl member) {
       synchronized (members) {
           MemberImpl results[] =
             new MemberImpl[members.length + 1];
@@ -130,7 +130,7 @@
      * 
      * @param member The member to remove
      */
-    protected void removeMcastMember(MemberImpl member) {
+    public void removeMember(MemberImpl member) {
         map.remove(member);
         synchronized (members) {
             int n = -1;
@@ -179,7 +179,7 @@
             MemberImpl[] result = new MemberImpl[list.size()];
             list.toArray(result);
             for( int j=0; j<result.length; j++) {
-                removeMcastMember(result[j]);
+                removeMember(result[j]);
             }
             return result;
         } else {
@@ -277,7 +277,7 @@
         }
 
         /**
-         * Return the actual McastMember object
+         * Return the actual Member object
          */
         public MemberImpl getMember() {
             return mbr;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to