Author: fhanik Date: Tue Feb 28 08:51:07 2006 New Revision: 381703 URL: http://svn.apache.org/viewcvs?rev=381703&view=rev Log: Fixed buffer usage
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=381703&r1=381702&r2=381703&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java Tue Feb 28 08:51:07 2006 @@ -22,6 +22,7 @@ import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.mcast.McastService; import org.apache.catalina.tribes.group.interceptors.GzipInterceptor; +import org.apache.catalina.tribes.group.interceptors.OrderInterceptor; /** * <p>Title: </p> @@ -53,6 +54,7 @@ .append("\n\t\t[-mfreq multicastfrequency]") .append("\n\t\t[-mdrop multicastdroptime]") .append("\n\t\t[-gzip]") + .append("\n\t\t[-order]") ; return buf; @@ -74,6 +76,7 @@ int mcastport = 45565; long mcastfreq = 500; long mcastdrop = 2000; + boolean order = false; for (int i = 0; i < args.length; i++) { if ("-bind".equals(args[i])) { @@ -88,6 +91,8 @@ tcpthreadcount = Integer.parseInt(args[++i]); } else if ("-gzip".equals(args[i])) { gzip = true; + } else if ("-order".equals(args[i])) { + order = true; } else if ("-ack".equals(args[i])) { ack = Boolean.parseBoolean(args[++i]); } else if ("-ackto".equals(args[i])) { @@ -137,6 +142,7 @@ channel.setMembershipService(service); if (gzip) channel.addInterceptor(new GzipInterceptor()); + if (order) channel.addInterceptor(new OrderInterceptor()); return channel; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381703&r1=381702&r2=381703&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java Tue Feb 28 08:51:07 2006 @@ -121,12 +121,13 @@ public void run() { long counter = 0; + long total = 0; LoadMessage msg = new LoadMessage(); int messageSize = LoadTest.messageSize; try { startTest(); - while (counter < msgCount) { + while (total < msgCount) { if (channel.getMembers().length == 0 || (!send)) { synchronized (mutex) { try { @@ -137,7 +138,8 @@ } } else { try { - msg.setMsgNr((int)++counter); + msg.setMsgNr((int)++total); + counter++; if (debug) { printArray(msg.getMessage()); } @@ -334,6 +336,7 @@ threads = Integer.parseInt(args[++i]); } else if ("-count".equals(args[i])) { count = Integer.parseInt(args[++i]); + System.out.println("Sending "+count+" messages."); } else if ("-pause".equals(args[i])) { pause = Long.parseLong(args[++i])*1000; } else if ("-break".equals(args[i])) { @@ -345,7 +348,7 @@ if ( "receive".equals(args[++i]) ) send = false; } else if ("-debug".equals(args[i])) { debug = true; - } else //("-help".equals(args[i])) + } else if ("-help".equals(args[i])) { usage(); System.exit(1); @@ -357,6 +360,7 @@ LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx); LoadMessage msg = new LoadMessage(); + messageSize = LoadMessage.getMessageSize(msg); channel.setChannelListener(test); channel.setMembershipListener(test); @@ -383,6 +387,9 @@ public void run() { System.out.println("Shutting down..."); + SystemExit exit = new SystemExit(5000); + exit.setDaemon(true); + exit.start(); try { channel.stop(channel.DEFAULT); @@ -390,6 +397,21 @@ x.printStackTrace(); } System.out.println("Channel stopped."); + } + } + public static class SystemExit extends Thread { + private long delay; + public SystemExit(long delay) { + this.delay = delay; + } + public void run () { + try { + Thread.sleep(delay); + }catch ( Exception x ) { + x.printStackTrace(); + } + System.exit(0); + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381703&r1=381702&r2=381703&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Tue Feb 28 08:51:07 2006 @@ -91,6 +91,7 @@ public void send(Member[] destination, Serializable msg) throws ChannelException { if ( msg == null ) return; try { + if ( destination == null ) destination = getMembers(); int options = 0; ClusterData data = new ClusterData();//generates a unique Id data.setAddress(getLocalMember()); @@ -103,7 +104,7 @@ b = XByteBuffer.serialize(msg); } data.setOptions(options); - XByteBuffer buffer = new XByteBuffer(b.length+128); + XByteBuffer buffer = new XByteBuffer(b.length+128,false); buffer.append(b,0,b.length); data.setMessage(buffer); getFirstInterceptor().sendMessage(destination, data, null); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381703&r1=381702&r2=381703&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Tue Feb 28 08:51:07 2006 @@ -37,7 +37,7 @@ private HashMap incounter = new HashMap(); private HashMap incoming = new HashMap(); private long expire = 3000; - private boolean forwardExpired; + private boolean forwardExpired = true; public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { for ( int i=0; i<destination.length; i++ ) { @@ -77,6 +77,7 @@ while ( tmp != null ) { //process expired messages if ( tmp.isExpired(expire) ) { + System.out.println("Found expired message"); //reset the head if ( tmp == head ) head = tmp.next; if ( getForwardExpired() ) super.messageReceived(tmp.getMessage()); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=381703&r1=381702&r2=381703&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Tue Feb 28 08:51:07 2006 @@ -198,7 +198,7 @@ System.arraycopy(b,offset,addr,0,addr.length); data.setAddress(McastMember.getMember(addr)); offset += addr.length; //addr data - data.message = new XByteBuffer(new byte[XByteBuffer.toInt(b,offset)]); + data.message = new XByteBuffer(new byte[XByteBuffer.toInt(b,offset)],false); offset += 4; //message length System.arraycopy(b,offset,data.message.getBytesDirect(),0,data.message.getLength()); offset += data.message.getLength(); //message data Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=381703&r1=381702&r2=381703&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Tue Feb 28 08:51:07 2006 @@ -52,7 +52,7 @@ public ObjectReader(SocketChannel channel, Selector selector, ListenCallback callback) { this.channel = channel; this.callback = callback; - this.buffer = new XByteBuffer(); + this.buffer = new XByteBuffer(true); } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381703&r1=381702&r2=381703&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Tue Feb 28 08:51:07 2006 @@ -85,30 +85,38 @@ protected int bufSize = 0; /** + * + * + */ + protected boolean discard = true; + + /** * Constructs a new XByteBuffer * @param size - the initial size of the byte buffer */ - public XByteBuffer(int size) { + public XByteBuffer(int size, boolean discard) { buf = new byte[size]; + this.discard = discard; } - public XByteBuffer(byte[] data) { - this(data,data.length+128); + public XByteBuffer(byte[] data,boolean discard) { + this(data,data.length+128,discard); } - public XByteBuffer(byte[] data, int size) { + public XByteBuffer(byte[] data, int size,boolean discard) { int length = Math.max(data.length,size); buf = new byte[length]; System.arraycopy(data,0,buf,0,data.length); bufSize = data.length; + this.discard = discard; } /** * Constructs a new XByteBuffer with an initial size of 1024 bytes */ - public XByteBuffer() { - this(DEF_SIZE); + public XByteBuffer(boolean discard) { + this(DEF_SIZE,discard); } public int getLength() { @@ -160,11 +168,13 @@ b.get(buf,bufSize,len); bufSize = newcount; - - if (bufSize > START_DATA.length && (firstIndexOf(buf,0,START_DATA)==-1)){ - bufSize = 0; - log.error("Discarded the package, invalid header"); - return false; + + if ( discard ) { + if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) { + bufSize = 0; + log.error("Discarded the package, invalid header"); + return false; + } } return true; @@ -188,10 +198,12 @@ System.arraycopy(b, off, buf, bufSize, len); bufSize = newcount; - if (bufSize > START_DATA.length && (firstIndexOf(buf,0,START_DATA)==-1)){ - bufSize = 0; - log.error("Discarded the package, invalid header"); - return false; + if ( discard ) { + if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) { + bufSize = 0; + log.error("Discarded the package, invalid header"); + return false; + } } return true; } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]