Author: fhanik
Date: Tue May 2 16:46:22 2006
New Revision: 399090
URL: http://svn.apache.org/viewcvs?rev=399090&view=rev
Log:
Refactored the membership layer to not be tied to multicasting, so that same
logic can be used for other implementations
Started implemented a TcpFailureDetector, so that we can increase reliability
in membership service
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java
- copied, changed from r399086,
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
Removed:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml Tue May 2
16:46:22 2006
@@ -217,12 +217,13 @@
<p>
<b>Threadless Interceptor stack</b>
The interceptor don't require any separate threads to perform their
message manipulation.<br/>
- Messages are sent will piggy back on the thread that is sending them all
the way through transmission.
+ Messages that are sent will piggy back on the thread that is sending them
all the way through transmission.
The exception is the <code>MessageDispatchInterceptor</code> that will
queue up the message
and send it on a separate thread for asynchronous message delivery.
Messages received are controlled by a thread pool in the
<code>receiver</code> component.<br/>
The channel object can send a <code>heartbeat()</code> through the
interceptor stack to allow
- for timeouts, cleanup and other events.
+ for timeouts, cleanup and other events.<br/>
+ The <code>MessageDispatchInterceptor</code> is the only interceptor that
is configured by default.
</p>
<p>
<b>Parallel Delivery</b><br/>
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelInterceptorBase.java
Tue May 2 16:46:22 2006
@@ -26,8 +26,7 @@
* @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul
2005) $
*/
-public abstract class ChannelInterceptorBase
- implements ChannelInterceptor {
+public abstract class ChannelInterceptorBase implements ChannelInterceptor {
protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(
ChannelInterceptorBase.class);
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=399090&view=auto
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
(added)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
Tue May 2 16:46:22 2006
@@ -0,0 +1,186 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.ChannelException;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import org.apache.catalina.tribes.Channel;
+import java.util.Arrays;
+import java.net.SocketTimeoutException;
+import org.apache.catalina.tribes.io.ClusterData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.HashMap;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.membership.MemberImpl;
+
+/**
+ * <p>Title: A perfect failure detector </p>
+ *
+ * <p>Description: The TcpFailureDetector is a useful interceptor
+ * that adds reliability to the membership layer.</p>
+ * <p>
+ * If the network is busy, or the system is busy so that the membership
receiver thread
+ * is not getting enough time to update its table, members can be "timed
out"
+ * This failure detector will intercept the memberDisappeared message(unless
its a true shutdown message)
+ * and connect to the member using TCP.
+ *
+ * NOT YET COMPLETE
+ *
+ * </p>
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class TcpFailureDetector extends ChannelInterceptorBase {
+
+ private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog( TcpFailureDetector.class );
+
+ protected static byte[] testMessage = new byte[] {
+ 79, -89, 115, 72, 121, -126, 67, -55, -97, 111, -119, -128, -95, 91,
7, 20,
+ 125, -39, 82, 91, -21, -15, 67, -102, -73, 126, -66, -113, -127, 103,
30, -74,
+ 55, 21, -66, -121, 69, 126, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
+ 85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124,
-64, -43};
+
+ protected boolean performConnectTest = true;
+
+ protected long connectTimeout = 1000;//1 second default
+
+ protected boolean performSendTest = true;
+
+ protected boolean performReadTest = false;
+
+ protected long readTestTimeout = 5000;//5 seconds
+
+ protected Membership membership = null;
+
+ protected HashMap suspect = new HashMap();
+
+ public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws ChannelException {
+ super.sendMessage(destination,msg,payload);
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ //catch incoming
+ boolean process = true;
+ if ( okToProcess(msg.getOptions()) ) {
+ //check to see if it is a testMessage, if so, process = false
+ process = ( (msg.getMessage().getLength() != testMessage.length) ||
+
(!Arrays.equals(testMessage,msg.getMessage().getBytes()) ) );
+ }//end if
+
+ //ignore the message, it doesnt have the flag set
+ if ( process ) super.messageReceived(msg);
+ else if ( log.isInfoEnabled() ) log.info("Received a failure detector
packet:"+msg);
+ }//messageReceived
+
+
+ public synchronized void memberAdded(Member member) {
+ if ( membership == null ) setupMembership();
+ if ( suspect.containsKey(member) ) suspect.remove(member);
+ else {
+ //not correct, this could make the membership out of sync
+ membership.addMember((MemberImpl)member);
+ super.memberAdded(member);
+ }
+ }
+
+ public synchronized void memberDisappeared(Member member) {
+ if ( membership == null ) setupMembership();
+ //check to see if the member really is gone
+ //if the payload is not a shutdown message
+ if ( !memberAlive(member) ) {
+ //not correct, we need to maintain the map
+ membership.removeMember((MemberImpl)member);
+ super.memberDisappeared(member);
+ } else suspect.put(member,new Long(System.currentTimeMillis()));
+ }
+
+ public boolean hasMembers() {
+ return super.hasMembers();
+ }
+
+ public Member[] getMembers() {
+ if ( membership == null ) setupMembership();
+ return membership.getMembers();
+ }
+
+ public Member getMember(Member mbr) {
+ return super.getMember(mbr);
+ }
+
+ public Member getLocalMember(boolean incAlive) {
+ return super.getLocalMember(incAlive);
+ }
+
+ public void heartbeat() {
+ //todo, implement an expiration of members that we deemed alive
+ //check them again and act accordingly
+
+ super.heartbeat();
+ }
+
+ protected synchronized void setupMembership() {
+ if ( membership == null ) {
+ membership = new
Membership((MemberImpl)super.getLocalMember(true));
+ }
+
+ }
+
+
+ protected boolean memberAlive(Member mbr) {
+ if ( Arrays.equals(mbr.getPayload(),Member.SHUTDOWN_PAYLOAD) ) return
false;
+
+ Socket socket = new Socket();
+ try {
+ InetAddress ia = InetAddress.getByAddress(mbr.getHost());
+ InetSocketAddress addr = new InetSocketAddress(ia, mbr.getPort());
+ socket.setSoTimeout((int)readTestTimeout);
+ socket.connect(addr, (int) connectTimeout);
+ if ( performSendTest ) {
+ ClusterData data = new ClusterData(true);
+ data.setAddress(mbr);
+ data.setMessage(new XByteBuffer(testMessage,false));
+ data.setTimestamp(System.currentTimeMillis());
+ int options = getOptionFlag() |
Channel.SEND_OPTIONS_BYTE_MESSAGE;
+ if ( performReadTest ) options = (options |
Channel.SEND_OPTIONS_USE_ACK);
+ data.setOptions(options);
+ byte[] message = XByteBuffer.createDataPackage(data);
+ socket.getOutputStream().write(message);
+ if ( performReadTest ) {
+ int length = socket.getInputStream().read(message);
+ return length > 0;
+ }
+ }//end if
+ return true;
+ } catch ( SocketTimeoutException sx) {
+ //do nothing, we couldn't connect
+ }catch (Exception x ) {
+ log.error("Unable to perform failure detection check, assuming
member down.",x);
+ } finally {
+ try {socket.close(); } catch ( Exception ignore ){}
+ }
+ return false;
+ }
+
+
+
+
+}
\ No newline at end of file
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
Tue May 2 16:46:22 2006
@@ -22,6 +22,7 @@
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.util.UUIDGenerator;
import org.apache.catalina.tribes.Channel;
+import java.sql.Timestamp;
/**
* The cluster data class is used to transport around the byte array from
@@ -243,5 +244,23 @@
return ( (Channel.SEND_OPTIONS_USE_ACK & options) ==
Channel.SEND_OPTIONS_USE_ACK) &&
( (Channel.SEND_OPTIONS_SYNCHRONIZED_ACK & options) !=
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
}
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("ClusterData[src=");
+ buf.append(getAddress()).append("; id=");
+ buf.append(bToS(getUniqueId())).append("; sent=");
+ buf.append(new Timestamp(this.getTimestamp()).toString()).append("]");
+ return buf.toString();
+ }
+
+ public static String bToS(byte[] data) {
+ StringBuffer buf = new StringBuffer(4*16);
+ buf.append("{");
+ for (int i=0; data!=null && i<data.length; i++ )
buf.append(String.valueOf(data[i])).append(" ");
+ buf.append("}");
+ return buf.toString();
+ }
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
Tue May 2 16:46:22 2006
@@ -81,7 +81,7 @@
/**
* The membership, used so that we calculate memberships when they arrive
or don't arrive
*/
- protected McastMembership membership;
+ protected Membership membership;
/**
* The actual listener, for callback when shits goes down
*/
@@ -148,7 +148,7 @@
receivePacket = new DatagramPacket(new byte[1024],1024);
receivePacket.setAddress(address);
receivePacket.setPort(port);
- membership = new McastMembership(member);
+ membership = new Membership(member);
timeToExpiration = expireTime;
this.service = service;
this.sendFrequency = sendFrequency;
@@ -239,7 +239,7 @@
if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) {
if (log.isDebugEnabled()) log.debug("Member has shutdown:" +
m);
- membership.removeMcastMember(m);
+ membership.removeMember(m);
service.memberDisappeared(m);
} else if (membership.memberAlive(m)) {
if (log.isDebugEnabled())
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=399090&r1=399089&r2=399090&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
Tue May 2 16:46:22 2006
@@ -391,7 +391,7 @@
return buf.toString();
}
- protected static String bToS(byte[] data) {
+ public static String bToS(byte[] data) {
StringBuffer buf = new StringBuffer(4*16);
buf.append("{");
for (int i=0; data!=null && i<data.length; i++ )
buf.append(String.valueOf(data[i])).append(" ");
Copied:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java
(from r399086,
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java)
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java?p2=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java&p1=tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java&r1=399086&r2=399090&rev=399090&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Membership.java
Tue May 2 16:46:22 2006
@@ -35,7 +35,7 @@
* @author Peter Rossbach
* @version $Revision: 356540 $, $Date: 2005-12-13 10:53:40 -0600 (Tue, 13 Dec
2005) $
*/
-public class McastMembership
+public class Membership
{
protected static final MemberImpl[] EMPTY_MEMBERS = new MemberImpl[0];
@@ -64,7 +64,7 @@
* Constructs a new membership
* @param name - has to be the name of the local member. Used to filter
the local member from the cluster membership
*/
- public McastMembership(MemberImpl local) {
+ public Membership(MemberImpl local) {
this.local = local;
}
@@ -94,7 +94,7 @@
if ( entry == null ) {
entry = new MbrEntry(member);
map.put(member,entry);
- addMcastMember(member);
+ addMember(member);
result = true;
} else {
//update the member alive time
@@ -113,7 +113,7 @@
* Add a member to this component and sort array with memberComparator
* @param member The member to add
*/
- protected void addMcastMember(MemberImpl member) {
+ public void addMember(MemberImpl member) {
synchronized (members) {
MemberImpl results[] =
new MemberImpl[members.length + 1];
@@ -130,7 +130,7 @@
*
* @param member The member to remove
*/
- protected void removeMcastMember(MemberImpl member) {
+ public void removeMember(MemberImpl member) {
map.remove(member);
synchronized (members) {
int n = -1;
@@ -179,7 +179,7 @@
MemberImpl[] result = new MemberImpl[list.size()];
list.toArray(result);
for( int j=0; j<result.length; j++) {
- removeMcastMember(result[j]);
+ removeMember(result[j]);
}
return result;
} else {
@@ -277,7 +277,7 @@
}
/**
- * Return the actual McastMember object
+ * Return the actual Member object
*/
public MemberImpl getMember() {
return mbr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]