Author: fhanik Date: Tue May 2 08:31:11 2006 New Revision: 398964 URL: http://svn.apache.org/viewcvs?rev=398964&view=rev Log: Added in a stop message for a soft shutdown Added in the ability to send a payload with each broadcast package
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=398964&r1=398963&r2=398964&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java Tue May 2 08:31:11 2006 @@ -73,4 +73,10 @@ */ public byte[] getUniqueId(); + /** + * returns the payload associated with this member + * @return byte[] + */ + public byte[] getPayload(); + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java?rev=398964&r1=398963&r2=398964&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java Tue May 2 08:31:11 2006 @@ -108,5 +108,12 @@ * removes the membership listener. */ public void removeMembershipListener(); + + /** + * Set a payload to be broadcasted with each membership + * broadcast. + * @param payload byte[] + */ + public void setPayload(byte[] payload); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java?rev=398964&r1=398963&r2=398964&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java Tue May 2 08:31:11 2006 @@ -134,7 +134,7 @@ synchronized (members) { int n = -1; for (int i = 0; i < members.length; i++) { - if (members[i] == member) { + if (members[i] == member || members[i].equals(member)) { n = i; break; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java?rev=398964&r1=398963&r2=398964&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java Tue May 2 08:31:11 2006 @@ -68,6 +68,8 @@ protected MemberImpl localMember ; private int mcastSoTimeout; private int mcastTTL; + + protected byte[] payload; /** * Create a membership service. @@ -232,6 +234,7 @@ localMember.setPort(port); localMember.setMemberAliveTime(100); } + if ( this.payload != null ) localMember.setPayload(payload); localMember.setUniqueId(UUIDGenerator.randomUUID(true)); localMember.setServiceStartTime(System.currentTimeMillis()); java.net.InetAddress bind = null; @@ -372,9 +375,18 @@ public int getMcastTTL() { return mcastTTL; } + + public byte[] getPayload() { + return payload; + } + public void setMcastTTL(int mcastTTL) { this.mcastTTL = mcastTTL; properties.setProperty("mcastTTL", String.valueOf(mcastTTL)); + } + + public void setPayload(byte[] payload) { + this.payload = payload; } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=398964&r1=398963&r2=398964&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Tue May 2 08:31:11 2006 @@ -23,6 +23,8 @@ import java.net.MulticastSocket; import org.apache.catalina.tribes.MembershipListener; +import java.util.Arrays; +import java.net.SocketTimeoutException; /** * A <b>membership</b> implementation using simple multicast. @@ -97,9 +99,20 @@ */ protected long serviceStartTime = System.currentTimeMillis(); + /** + * Time to live for the multicast packets that are being sent out + */ protected int mcastTTL = -1; + /** + * Read timeout on the mcast socket + */ protected int mcastSoTimeout = -1; + /** + * bind address + */ protected InetAddress mcastBindAddress = null; + + protected static final byte[] STOP_PAYLOAD = new byte[] {66, 65, 66, 89, 45, 65, 76, 69, 88}; /** * Create a new mcast service impl @@ -130,10 +143,10 @@ this.mcastTTL = ttl; this.mcastBindAddress = bind; setupSocket(); - sendPacket = new DatagramPacket(new byte[1000],1000); + sendPacket = new DatagramPacket(new byte[1024],1024); sendPacket.setAddress(address); sendPacket.setPort(port); - receivePacket = new DatagramPacket(new byte[1000],1000); + receivePacket = new DatagramPacket(new byte[1024],1024); receivePacket.setAddress(address); receivePacket.setPort(port); membership = new McastMembership(member); @@ -193,10 +206,18 @@ * @throws IOException if the service fails to disconnect from the sockets */ public synchronized void stop() throws IOException { - socket.leaveGroup(address); doRun = false; + sender.interrupt(); + receiver.interrupt(); sender = null; receiver = null; + //send a stop message + byte[] payload = member.getPayload(); + member.setPayload(STOP_PAYLOAD); + send(); + //restore payload + member.setPayload(payload); + socket.leaveGroup(address); serviceStartTime = Long.MAX_VALUE; } @@ -205,23 +226,42 @@ * @throws IOException */ public void receive() throws IOException { - socket.receive(receivePacket); - byte[] data = new byte[receivePacket.getLength()]; - System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length); - MemberImpl m = MemberImpl.getMember(data); - if(log.isDebugEnabled()) - log.debug("Mcast receive ping from member " + m); + checkExpired(); + try { + socket.receive(receivePacket); + byte[] data = new byte[receivePacket.getLength()]; + System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length); + MemberImpl m = MemberImpl.getMember(data); + if (log.isDebugEnabled()) + log.debug("Mcast receive ping from member " + m); - if ( membership.memberAlive(m) ) { - if(log.isDebugEnabled()) - log.debug("Mcast add member " + m); - service.memberAdded(m); + if (Arrays.equals(m.getPayload(), STOP_PAYLOAD)) { + if (log.isDebugEnabled()) + log.debug("Mcast member shutdown" + m); + membership.removeMcastMember(m); + service.memberDisappeared(m); + } else if (membership.memberAlive(m)) { + if (log.isDebugEnabled()) + log.debug("Mcast add member " + m); + service.memberAdded(m); + } //end if + } catch (SocketTimeoutException x ) { + //do nothing, this is normal, we don't want to block forever + //since the receive thread is the same thread + //that does membership expiration } + } + + protected synchronized void checkExpired() { MemberImpl[] expired = membership.expire(timeToExpiration); for ( int i=0; i<expired.length; i++) { if(log.isDebugEnabled()) - log.debug("Mcast exipre member " + m); - service.memberDisappeared(expired[i]); + log.debug("Mcast exipre member " + expired[i]); + try { + service.memberDisappeared(expired[i]); + }catch ( Exception x ) { + log.error("Unable to process member disappeared message.",x); + } } } @@ -229,7 +269,7 @@ * Send a ping * @throws Exception */ - public void send() throws Exception{ + public void send() throws IOException{ member.inc(); if(log.isDebugEnabled()) log.debug("Mcast send ping from member " + member); @@ -238,6 +278,7 @@ p.setAddress(address); p.setPort(port); socket.send(p); + checkExpired(); } public long getServiceStartTime() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=398964&r1=398963&r2=398964&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Tue May 2 08:31:11 2006 @@ -83,8 +83,22 @@ */ protected transient long serviceStartTime; + /** + * To avoid serialization over and over again, once the local dataPkg + * has been set, we use that to transmit data + */ protected transient byte[] dataPkg = null; - private byte[] uniqueId = new byte[16]; + + /** + * Unique session Id for this member + */ + protected byte[] uniqueId = new byte[16]; + + /** + * Custom payload that an app framework can broadcast + * Also used to transport stop command. + */ + protected byte[] payload = new byte[0]; /** * Empty constructor for serialization @@ -178,13 +192,15 @@ //dlen - 4 bytes //domain - dlen bytes //uniqueId - 16 bytes + //payload length - 4 bytes + //payload plen bytes byte[] domaind = this.domain; byte[] addr = host; long alive=System.currentTimeMillis()-getServiceStartTime(); byte hl = (byte)addr.length; - byte[] data = new byte[8+4+1+addr.length+4+domaind.length+16]; + byte[] data = new byte[8+4+1+addr.length+4+domaind.length+16+4+payload.length]; int pos = 0; //alive data XByteBuffer.toBytes((long)alive,data,0); @@ -205,6 +221,14 @@ pos+=domaind.length; //unique Id System.arraycopy(uniqueId,0,data,pos,uniqueId.length); + pos+=uniqueId.length; + //payload + XByteBuffer.toBytes(payload.length,data,pos); + pos+=4; + System.arraycopy(payload,0,data,pos,payload.length); + pos+=payload.length; + + //create local data dataPkg = data; return data; } @@ -247,12 +271,21 @@ byte[] uniqueId = new byte[16]; System.arraycopy(data, pos, uniqueId, 0, 16); + pos+=16; + + int pl = XByteBuffer.toInt(data,pos); + pos+=4; + + byte[] payload = new byte[pl]; + System.arraycopy(data, pos, payload, 0, payload.length); + pos+=payload.length; member.domain = domaind; member.setHost(addr); member.setPort(XByteBuffer.toInt(portd, 0)); member.setMemberAliveTime(XByteBuffer.toLong(alived, 0)); member.setUniqueId(uniqueId); + member.payload = payload; member.dataPkg = new byte[data.length]; System.arraycopy(data,0,member.dataPkg,0,data.length); @@ -327,6 +360,10 @@ return uniqueId; } + public byte[] getPayload() { + return payload; + } + public void setMemberAliveTime(long time) { memberAliveTime=time; } @@ -343,7 +380,8 @@ buf.append(getHostname()).append(","); buf.append(port).append(", alive="); buf.append(memberAliveTime).append(","); - buf.append("id=").append(bToS(this.uniqueId)); + buf.append("id=").append(bToS(this.uniqueId)).append(", "); + buf.append("payload=").append(bToS(this.payload)).append(", "); buf.append("]"); return buf.toString(); } @@ -466,6 +504,10 @@ public void setUniqueId(byte[] uniqueId) { this.uniqueId = uniqueId; + } + + public void setPayload(byte[] payload) { + this.payload = payload; } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]