Author: fhanik Date: Tue Feb 28 08:14:20 2006 New Revision: 381687 URL: http://svn.apache.org/viewcvs?rev=381687&view=rev Log: Added order protocol, created a command line channel creator
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Removed: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java Tue Feb 28 08:14:20 2006 @@ -16,6 +16,7 @@ package org.apache.catalina.tribes; import java.io.Serializable; +import org.apache.catalina.tribes.io.XByteBuffer; /** * @author Filip Hanik @@ -61,11 +62,13 @@ */ public byte[] getUniqueId(); - public void setMessage(byte[] data); + public void setMessage(XByteBuffer buf); - public byte[] getMessage(); + public XByteBuffer getMessage(); public int getOptions(); public void setOptions(int options); + + public ChannelMessage clone(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java Tue Feb 28 08:14:20 2006 @@ -69,6 +69,8 @@ */ public boolean hasMembers(); + + public Member getMember(Member mbr); /** * Returns a list of all the members in the cluster. */ Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java Tue Feb 28 08:14:20 2006 @@ -1,3 +1,18 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.catalina.tribes.demos; import org.apache.catalina.tribes.Member; @@ -288,20 +303,15 @@ System.out.println("Usage:\n\t"+ "java LoadTest [options]\n\t"+ "Options:\n\t\t"+ - "[-bind tcpbindaddress] \n\t\t"+ - "[-port tcplistenport] \n\t\t"+ - "[-mbind multicastbindaddr] \n\t\t"+ "[-mode receive|send|both] \n\t\t"+ "[-debug] \n\t\t"+ "[-count messagecount] \n\t\t"+ "[-stats statinterval] \n\t\t"+ - "[-ack true|false] \n\t\t"+ - "[-sync true|false] \n\t\t"+ - "[-gzip] \n\t\t"+ "[-pause nrofsecondstopausebetweensends] \n\t\t"+ - "[-sender pooled|fastasyncqueue] \n\t\t"+ "[-threads numberofsenderthreads] \n\t\t"+ "[-break (halts execution on exception)]\n"+ + "\tChannel options:"+ + ChannelCreator.usage()+"\n\n"+ "Example:\n\t"+ "java LoadTest -port 4004\n\t"+ "java LoadTest -bind 192.168.0.45 -port 4005\n\t"+ @@ -309,49 +319,28 @@ } public static void main(String[] args) throws Exception { - String bind = "auto"; - int port = 4001; boolean send = true; - String mbind = null; boolean debug = false; - boolean ack = true; - boolean sync = true; - boolean gzip = false; long pause = 0; int count = 1000000; int stats = 10000; boolean breakOnEx = false; int threads = 1; - String sender = "pooled"; if ( args.length == 0 ) { args = new String[] {"-help"}; } for (int i = 0; i < args.length; i++) { - if ("-bind".equals(args[i])) { - bind = args[++i]; - } else if ("-sender".equals(args[i])) { - sender = args[++i]; - } else if ("-port".equals(args[i])) { - port = Integer.parseInt(args[++i]); - } else if ("-threads".equals(args[i])) { + if ("-threads".equals(args[i])) { threads = Integer.parseInt(args[++i]); } else if ("-count".equals(args[i])) { count = Integer.parseInt(args[++i]); } else if ("-pause".equals(args[i])) { pause = Long.parseLong(args[++i])*1000; - } else if ("-gzip".equals(args[i])) { - gzip = true; } else if ("-break".equals(args[i])) { breakOnEx = true; - } else if ("-ack".equals(args[i])) { - ack = Boolean.parseBoolean(args[++i]); - } else if ("-sync".equals(args[i])) { - sync = Boolean.parseBoolean(args[++i]); } else if ("-stats".equals(args[i])) { stats = Integer.parseInt(args[++i]); System.out.println("Stats every "+stats+" message"); - } else if ("-mbind".equals(args[i])) { - mbind = args[++i]; } else if ("-mode".equals(args[i])) { if ( "receive".equals(args[++i]) ) send = false; } else if ("-debug".equals(args[i])) { @@ -364,34 +353,7 @@ } - ReplicationListener rl = new ReplicationListener(); - rl.setTcpListenAddress(bind); - rl.setTcpListenPort(port); - rl.setTcpSelectorTimeout(100); - rl.setTcpThreadCount(4); - rl.getBind(); - rl.setSendAck(ack); - rl.setSynchronized(sync); - - ReplicationTransmitter ps = new ReplicationTransmitter(); - ps.setReplicationMode(sender); - ps.setAckTimeout(15000); - ps.setAutoConnect(true); - ps.setWaitForAck(ack); - - McastService service = new McastService(); - service.setMcastAddr("228.0.0.5"); - if ( mbind != null ) service.setMcastBindAddress(mbind); - service.setMcastFrequency(500); - service.setMcastDropTime(2000); - service.setMcastPort(45565); - - ManagedChannel channel = new GroupChannel(); - channel.setChannelReceiver(rl); - channel.setChannelSender(ps); - channel.setMembershipService(service); - - if ( gzip ) channel.addInterceptor(new GzipInterceptor()); + ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args); LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx); LoadMessage msg = new LoadMessage(); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Tue Feb 28 08:14:20 2006 @@ -22,9 +22,7 @@ import org.apache.catalina.tribes.ChannelSender; import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.Channel; -import java.io.IOException; import org.apache.catalina.tribes.InterceptorPayload; -import org.apache.catalina.tribes.io.ClusterData; import org.apache.catalina.tribes.MessageListener; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Tue Feb 28 08:14:20 2006 @@ -103,8 +103,9 @@ b = XByteBuffer.serialize(msg); } data.setOptions(options); - - data.setMessage(b); + XByteBuffer buffer = new XByteBuffer(b.length+128); + buffer.append(b,0,b.length); + data.setMessage(buffer); getFirstInterceptor().sendMessage(destination, data, null); }catch ( Exception x ) { if ( x instanceof ChannelException ) throw (ChannelException)x; @@ -118,9 +119,9 @@ Serializable fwd = null; if ( (msg.getOptions() & BYTE_MESSAGE) == BYTE_MESSAGE ) { - fwd = new ByteMessage(msg.getMessage()); + fwd = new ByteMessage(msg.getMessage().getBytes()); } else { - fwd = XByteBuffer.deserialize(msg.getMessage()); + fwd = XByteBuffer.deserialize(msg.getMessage().getBytes()); } if ( channelListener != null && channelListener.accept(fwd,msg.getAddress())) channelListener.messageReceived(fwd,msg.getAddress()); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java Tue Feb 28 08:14:20 2006 @@ -41,7 +41,9 @@ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { try { - msg.setMessage(compress(msg.getMessage())); + byte[] data = compress(msg.getMessage().getBytes()); + msg.getMessage().trim(msg.getMessage().getLength()); + msg.getMessage().append(data,0,data.length); getNext().sendMessage(destination, msg, payload); } catch ( IOException x ) { log.error("Unable to compress byte contents"); @@ -51,7 +53,9 @@ public void messageReceived(ChannelMessage msg) { try { - msg.setMessage(decompress(msg.getMessage())); + byte[] data = decompress(msg.getMessage().getBytes()); + msg.getMessage().trim(msg.getMessage().getLength()); + msg.getMessage().append(data,0,data.length); getPrevious().messageReceived(msg); } catch ( IOException x ) { log.error("Unable to decompress byte contents",x); Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381687&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Tue Feb 28 08:14:20 2006 @@ -0,0 +1,231 @@ +/* + * Copyright 1999,2004 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ + +package org.apache.catalina.tribes.group.interceptors; + +import java.util.HashMap; + +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.InterceptorPayload; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.io.XByteBuffer; + + + +/** + * + * + * @author Filip Hanik + * @version 1.0 + */ +public class OrderInterceptor extends ChannelInterceptorBase { + private HashMap outcounter = new HashMap(); + private HashMap incounter = new HashMap(); + private HashMap incoming = new HashMap(); + private long expire = 3000; + private boolean forwardExpired; + + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + for ( int i=0; i<destination.length; i++ ) { + ChannelMessage tmp = msg.clone(); + int nr = incCounter(destination[i]); + tmp.getMessage().append(XByteBuffer.toBytes(nr),0,4); + getNext().sendMessage(new Member[] {destination[i]}, tmp, payload); + } + } + + public void messageReceived(ChannelMessage msg) { + int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4); + msg.getMessage().trim(4); + MessageOrder order = new MessageOrder(msgnr,msg); + processIncoming(order); + //getPrevious().messageReceived(msg); + } + + public synchronized void processIncoming(MessageOrder order) { + Member member = order.getMessage().getAddress(); + Counter cnt = getInCounter(member); + + MessageOrder tmp = (MessageOrder)incoming.get(member); + if ( tmp != null ) { + order = MessageOrder.add(tmp,order); + } + while ( (order!=null) && (order.getMsgNr() == cnt.getCounter()) ) { + //we are right on target. process orders + cnt.inc(); + super.messageReceived(order.getMessage()); + order.setMessage(null); + order = order.next; + } + MessageOrder head = order; + MessageOrder prev = null; + tmp = order; + while ( tmp != null ) { + //process expired messages + if ( tmp.isExpired(expire) ) { + //reset the head + if ( tmp == head ) head = tmp.next; + if ( getForwardExpired() ) super.messageReceived(tmp.getMessage()); + tmp.setMessage(null); + tmp = tmp.next; + if ( prev != null ) prev.next = tmp; + } else { + prev = tmp; + } + } + if ( head == null ) incoming.remove(member); + else incoming.put(member,head); + } + + public void memberAdded(Member member) { + //notify upwards + getInCounter(member); + getOutCounter(member); + super.memberAdded(member); + } + + public void memberDisappeared(Member member) { + //notify upwards + outcounter.remove(member); + incounter.remove(member); + super.memberDisappeared(member); + } + + public int incCounter(Member mbr) { + Counter cnt = getOutCounter(mbr); + return cnt.inc(); + } + + public synchronized Counter getInCounter(Member mbr) { + Counter cnt = (Counter)incounter.get(mbr); + if ( cnt == null ) { + cnt = new Counter(); + cnt.inc(); //always start at 1 for incoming + incounter.put(mbr,cnt); + } + return cnt; + } + + public synchronized Counter getOutCounter(Member mbr) { + Counter cnt = (Counter)outcounter.get(mbr); + if ( cnt == null ) { + cnt = new Counter(); + outcounter.put(mbr,cnt); + } + return cnt; + } + + public static class Counter { + private int value = 0; + + public int getCounter() { + return value; + } + + public synchronized int inc() { + return ++value; + } + } + + public static class MessageOrder { + private long received = System.currentTimeMillis(); + private MessageOrder next; + private int msgNr; + private ChannelMessage msg = null; + public MessageOrder(int msgNr,ChannelMessage msg) { + this.msgNr = msgNr; + this.msg = msg; + } + + public boolean isExpired(long expireTime) { + return (System.currentTimeMillis()-received) > expireTime; + } + + public ChannelMessage getMessage() { + return msg; + } + + public void setMessage(ChannelMessage msg) { + this.msg = msg; + } + + public void setNext(MessageOrder order) { + this.next = order; + } + public MessageOrder getNext() { + return next; + } + + public static MessageOrder add(MessageOrder head, MessageOrder add) { + if ( head == null ) return add; + if ( add == null ) return head; + if ( head == add ) return add; + + if ( head.getMsgNr() > add.getMsgNr() ) { + //add before + MessageOrder tmp = add.next; + add.next = head; + head.next = tmp; + return add; + } + + MessageOrder iter = head; + MessageOrder prev = null; + while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) { + prev = iter; + iter = iter.next; + } + if ( iter.getMsgNr() < add.getMsgNr() ) { + //add after + add.next = iter.next; + iter.next = add; + } else if (iter.getMsgNr() > add.getMsgNr()) { + //add before + prev.next = add; + add.next = iter; + + } else { + throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor"); + } + return head; + } + + public int getMsgNr() { + return msgNr; + } + + + + } + + public void setExpire(long expire) { + this.expire = expire; + } + + public void setForwardExpired(boolean forwardExpired) { + this.forwardExpired = forwardExpired; + } + + public long getExpire() { + return expire; + } + + public boolean getForwardExpired() { + return forwardExpired; + } + +} Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Tue Feb 28 08:14:20 2006 @@ -39,7 +39,7 @@ public class ClusterData implements ChannelMessage { private int options = 0 ; - private byte[] message ; + private XByteBuffer message ; private long timestamp ; private byte[] uniqueId ; private Member address; @@ -60,7 +60,7 @@ * @param message message data * @param timestamp message creation date */ - public ClusterData(byte[] uniqueId, byte[] message, long timestamp) { + public ClusterData(byte[] uniqueId, XByteBuffer message, long timestamp) { this.uniqueId = uniqueId; this.message = message; this.timestamp = timestamp; @@ -69,13 +69,13 @@ /** * @return Returns the message. */ - public byte[] getMessage() { + public XByteBuffer getMessage() { return message; } /** * @param message The message to set. */ - public void setMessage(byte[] message) { + public void setMessage(XByteBuffer message) { this.message = message; } /** @@ -150,7 +150,7 @@ * @return byte[] */ - public byte[] getDataPackage() throws IOException { + public byte[] getDataPackage() { byte[] addr = ((McastMember)address).getData(); int length = 4 + //options @@ -160,7 +160,7 @@ 4 + //addr length off=12+uniqueId.length+4 addr.length+ //member data off=12+uniqueId.length+4+add.length 4 + //message length off=12+uniqueId.length+4+add.length+4 - message.length; + message.getLength(); byte[] data = new byte[length]; int offset = 0; XByteBuffer.toBytes(options,data,offset); @@ -175,14 +175,14 @@ offset += 4; //addr.length System.arraycopy(addr,0,data,offset,addr.length); offset += addr.length; //addr data - XByteBuffer.toBytes(message.length,data,offset); + XByteBuffer.toBytes(message.getLength(),data,offset); offset += 4; //message.length - System.arraycopy(message,0,data,offset,message.length); - offset += message.length; //message data + System.arraycopy(message.getBytesDirect(),0,data,offset,message.getLength()); + offset += message.getLength(); //message data return data; } - public static ClusterData getDataFromPackage(byte[] b) throws IOException { + public static ClusterData getDataFromPackage(byte[] b) { ClusterData data = new ClusterData(false); int offset = 0; data.setOptions(XByteBuffer.toInt(b,offset)); @@ -198,30 +198,26 @@ System.arraycopy(b,offset,addr,0,addr.length); data.setAddress(McastMember.getMember(addr)); offset += addr.length; //addr data - data.message = new byte[XByteBuffer.toInt(b,offset)]; + data.message = new XByteBuffer(new byte[XByteBuffer.toInt(b,offset)]); offset += 4; //message length - System.arraycopy(b,offset,data.message,0,data.message.length); - offset += data.message.length; //message data + System.arraycopy(b,offset,data.message.getBytesDirect(),0,data.message.getLength()); + offset += data.message.getLength(); //message data return data; } - public static void main(String[] args) throws Exception { - ClusterData data1 = new ClusterData(); - data1.setAddress(new McastMember("domain","127.0.0.1",1000,System.currentTimeMillis())); - data1.setMessage(new byte[1024]); - - byte[] b = data1.getDataPackage(); - - ClusterData data2 = ClusterData.getDataFromPackage(b); - - if ( !(data1.getAddress().equals(data2.getAddress())) || - !(Arrays.equals(data1.getMessage(),data2.getMessage())) || - !(Arrays.equals(data1.getUniqueId(),data2.getUniqueId())) || - !(data1.getTimestamp() == data2.timestamp) || - !(data1.getOptions() == data2.getOptions() ) ) { - throw new Exception("Not Equal"); - } - + public int hashCode() { + return XByteBuffer.toInt(getUniqueId(),0); + } + + public boolean equals(Object o) { + if ( o instanceof ClusterData ) { + return Arrays.equals(getUniqueId(),((ClusterData)o).getUniqueId()); + } else return false; + } + + public ClusterData clone() { + byte[] d = this.getDataPackage(); + return ClusterData.getDataFromPackage(d); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Tue Feb 28 08:14:20 2006 @@ -91,6 +91,18 @@ public XByteBuffer(int size) { buf = new byte[size]; } + + public XByteBuffer(byte[] data) { + this(data,data.length+128); + } + + public XByteBuffer(byte[] data, int size) { + int length = Math.max(data.length,size); + buf = new byte[length]; + System.arraycopy(data,0,buf,0,data.length); + bufSize = data.length; + } + /** * Constructs a new XByteBuffer with an initial size of 1024 bytes @@ -98,6 +110,20 @@ public XByteBuffer() { this(DEF_SIZE); } + + public int getLength() { + return bufSize; + } + + public void trim(int length) { + if ( (bufSize - length) < 0 ) + throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length); + bufSize -= length; + } + + public byte[] getBytesDirect() { + return this.buf; + } /** * Returns the bytes in the buffer, in its exact length @@ -415,11 +441,6 @@ ByteArrayOutputStream outs = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(outs); out.writeObject(msg); - // flush out the gzip stream to byte buffer -// if(out != null) { -// out.flush(); -// out.close(); -// } byte[] data = outs.toByteArray(); return data; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Tue Feb 28 08:14:20 2006 @@ -135,7 +135,7 @@ * @return - the bytes for this member deserialized * @throws Exception */ - public byte[] getData() throws IOException { + public byte[] getData() { //package looks like //alive - 8 bytes //port - 4 bytes Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java Tue Feb 28 08:14:20 2006 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import org.apache.catalina.tribes.Member; /** * A <b>membership</b> implementation using simple multicast. @@ -190,6 +191,19 @@ */ public synchronized boolean hasMembers() { return members.length > 0 ; + } + + + public synchronized McastMember getMember(Member mbr) { + if(hasMembers()) { + McastMember result = null; + for ( int i=0; i<this.members.length && result==null; i++ ) { + if ( members[i].equals(mbr) ) result = members[i]; + }//for + return result; + } else { + return null; + } } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java Tue Feb 28 08:14:20 2006 @@ -352,6 +352,11 @@ if ( impl == null || impl.membership == null ) return false; return impl.membership.hasMembers(); } + + public Member getMember(Member mbr) { + if ( impl == null || impl.membership == null ) return null; + return impl.membership.getMember(mbr); + } /** * Return all the members Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java Tue Feb 28 08:14:20 2006 @@ -166,12 +166,12 @@ synchronized (this) { inQueueCounter++; if(queueThread != null) - queueThread.incQueuedNrOfBytes(data.getMessage().length); + queueThread.incQueuedNrOfBytes(data.getMessage().getLength()); } if (log.isTraceEnabled()) log.trace(sm.getString("AsyncSocketSender.queue.message", getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long( - data.getMessage().length))); + data.getMessage().getLength()))); } /* @@ -268,7 +268,7 @@ int messagesize = 0; try { ChannelMessage data = (ChannelMessage) entry.getValue(); - messagesize = data.getMessage().length; + messagesize = data.getMessage().getLength(); sender.pushMessage(data); } catch (Exception x) { log.warn(sm.getString("AsyncSocketSender.send.error", Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Tue Feb 28 08:14:20 2006 @@ -827,8 +827,8 @@ checkKeepAlive(); if(doProcessingStats) addProcessingStats(time); if(messageTransfered) { - addStats(data.getMessage().length); - if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new Integer(port), data.getUniqueId(), new Long(data.getMessage().length))); + addStats(data.getMessage().getLength()); + if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new Integer(port), data.getUniqueId(), new Long(data.getMessage().getLength()))); } else { dataFailureCounter++; if ( exception != null ) throw exception; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java Tue Feb 28 08:14:20 2006 @@ -319,12 +319,12 @@ synchronized (this) { inQueueCounter++; if(queueThread != null) - queueThread.incQueuedNrOfBytes(data.getMessage().length); + queueThread.incQueuedNrOfBytes(data.getMessage().getLength()); } if (log.isTraceEnabled()) log.trace(sm.getString("AsyncSocketSender.queue.message", getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long( - data.getMessage().length))); + data.getMessage().getLength()))); } /** @@ -480,7 +480,7 @@ int messagesize = 0; try { ChannelMessage data = (ChannelMessage) entry.data(); - messagesize = data.getMessage().length; + messagesize = data.getMessage().getLength(); sender.pushMessage(data); } catch (Exception x) { log.warn(sm.getString( Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java Tue Feb 28 08:14:20 2006 @@ -132,7 +132,7 @@ //return the connection to the pool senderQueue.returnSender(sender); } - addStats(data.getMessage().length); + addStats(data.getMessage().getLength()); } public String toString() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=381687&r1=381686&r2=381687&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Tue Feb 28 08:14:20 2006 @@ -602,7 +602,7 @@ } sender.sendMessage(data); sender.setSuspect(false); - addStats(data.getMessage().length); + addStats(data.getMessage().getLength()); } catch (IOException x) { if (!sender.getSuspect()) { if (log.isErrorEnabled() ) log.error("Unable to send replicated message, is member ["+sender.toString()+"] down?",x); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]