Author: fhanik Date: Tue Feb 28 14:26:50 2006 New Revision: 381798 URL: http://svn.apache.org/viewcvs?rev=381798&view=rev Log: Completed the fragmentation interceptor
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=381798&r1=381797&r2=381798&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java Tue Feb 28 14:26:50 2006 @@ -26,7 +26,7 @@ * Abstract class for the interceptor base class. * @author Filip Hanik * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ - */ + */ public interface ChannelInterceptor extends MembershipListener { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=381798&r1=381797&r2=381798&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java Tue Feb 28 14:26:50 2006 @@ -22,7 +22,7 @@ * @author Filip Hanik * @version $Revision: 303950 $, $Date: 2005-06-09 15:38:30 -0500 (Thu, 09 Jun 2005) $ * - */ + */ public interface ChannelSender { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=381798&r1=381797&r2=381798&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java Tue Feb 28 14:26:50 2006 @@ -23,6 +23,7 @@ import org.apache.catalina.tribes.mcast.McastService; import org.apache.catalina.tribes.group.interceptors.GzipInterceptor; import org.apache.catalina.tribes.group.interceptors.OrderInterceptor; +import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor; /** * <p>Title: </p> @@ -55,7 +56,9 @@ .append("\n\t\t[-mdrop multicastdroptime]") .append("\n\t\t[-gzip]") .append("\n\t\t[-order]") - .append("\n\t\t[-ordersize maxorderqueuesize]"); + .append("\n\t\t[-ordersize maxorderqueuesize]") + .append("\n\t\t[-frag]") + .append("\n\t\t[-fragsize maxmsgsize]"); return buf; } @@ -78,6 +81,8 @@ long mcastdrop = 2000; boolean order = false; int ordersize = Integer.MAX_VALUE; + boolean frag = false; + int fragsize = 1024; for (int i = 0; i < args.length; i++) { if ("-bind".equals(args[i])) { @@ -97,6 +102,11 @@ } else if ("-ordersize".equals(args[i])) { ordersize = Integer.parseInt(args[++i]); System.out.println("Setting OrderInterceptor.maxQueue="+ordersize); + } else if ("-frag".equals(args[i])) { + frag = true; + } else if ("-fragsize".equals(args[i])) { + fragsize = Integer.parseInt(args[++i]); + System.out.println("Setting FragmentationInterceptor.maxSize="+fragsize); } else if ("-ack".equals(args[i])) { ack = Boolean.parseBoolean(args[++i]); } else if ("-ackto".equals(args[i])) { @@ -146,6 +156,11 @@ channel.setMembershipService(service); if (gzip) channel.addInterceptor(new GzipInterceptor()); + if ( frag ) { + FragmentationInterceptor fi = new FragmentationInterceptor(); + fi.setMaxSize(fragsize); + channel.addInterceptor(fi); + } if (order) { OrderInterceptor oi = new OrderInterceptor(); oi.setMaxQueue(ordersize); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java?rev=381798&r1=381797&r2=381798&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java Tue Feb 28 14:26:50 2006 @@ -15,13 +15,200 @@ package org.apache.catalina.tribes.group.interceptors; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.InterceptorPayload; +import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.io.XByteBuffer; /** * - * + * The fragmentation interceptor splits up large messages into smaller messages and assembles them on the other end. + * This is very useful when you don't want large messages hogging the sending sockets + * and smaller messages can make it through. + * + * <br><b>Configuration Options</b><br> + * OrderInteceptor.expire=<milliseconds> - how long do we keep the fragments in memory and wait for the rest to arrive<b>default=60,000ms -> 60seconds</b> + * This setting is useful to avoid OutOfMemoryErrors<br> + * OrderInteceptor.maxSize=<max message size> - message size in bytes <b>default=1024*100 (around a tenth of a MB)</b><br> * @author Filip Hanik * @version 1.0 */ public class FragmentationInterceptor extends ChannelInterceptorBase { + protected HashMap fragpieces = new HashMap(); + private int maxSize = 1024*100; + private long expire = 1000 * 60; //one minute expiration + + + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + int size = msg.getMessage().getLength(); + boolean frag = (size>maxSize); + if ( frag ) { + frag(destination, msg, payload); + } + else { + byte[] flag = XByteBuffer.toBytes(frag); + msg.getMessage().append(flag,0,flag.length); + super.sendMessage(destination, msg, payload); + } + } + + public void messageReceived(ChannelMessage msg) { + boolean isFrag = XByteBuffer.toBoolean(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-1); + msg.getMessage().trim(1); + if ( isFrag ) { + defrag(msg); + } else { + super.messageReceived(msg); + } + } + + + public synchronized FragCollection getFragCollection(FragKey key, ChannelMessage msg) { + FragCollection coll = (FragCollection)fragpieces.get(key); + if ( coll == null ) { + coll = new FragCollection(msg); + fragpieces.put(key,coll); + } + return coll; + } + + public synchronized void removeFragCollection(FragKey key) { + fragpieces.remove(key); + } + + public void defrag(ChannelMessage msg ) { + FragKey key = new FragKey(msg.getUniqueId()); + FragCollection coll = getFragCollection(key,msg); + coll.addMessage(msg); + + if ( coll.complete() ) { + removeFragCollection(key); + ChannelMessage complete = coll.assemble(); + super.messageReceived(complete); + + } + } + + public void frag(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + int size = msg.getMessage().getLength(); + + int count = ((size / maxSize )+(size%maxSize==0?0:1)); + ChannelMessage[] messages = new ChannelMessage[count]; + int remaining = size; + for ( int i=0; i<count; i++ ) { + ChannelMessage tmp = msg.clone(); + int offset = (i*maxSize); + int length = Math.min(remaining,maxSize); + tmp.getMessage().clear(); + tmp.getMessage().append(msg.getMessage().getBytesDirect(),offset,length); + //add the msg nr + tmp.getMessage().append(XByteBuffer.toBytes(i),0,4); + //add the total nr of messages + tmp.getMessage().append(XByteBuffer.toBytes(count),0,4); + //add true as the frag flag + byte[] flag = XByteBuffer.toBytes(true); + tmp.getMessage().append(flag,0,flag.length); + messages[i] = tmp; + remaining -= length; + + } + for ( int i=0; i<messages.length; i++ ) { + super.sendMessage(destination,messages[i],payload); + } + } + + + + public int getMaxSize() { + return maxSize; + } + + public long getExpire() { + return expire; + } + + public void setMaxSize(int maxSize) { + this.maxSize = maxSize; + } + + public void setExpire(long expire) { + this.expire = expire; + } + + public static class FragCollection { + private long received = System.currentTimeMillis(); + private ChannelMessage msg; + private XByteBuffer[] frags; + public FragCollection(ChannelMessage msg) { + //get the total messages + int count = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4); + frags = new XByteBuffer[count]; + this.msg = msg; + } + + public void addMessage(ChannelMessage msg) { + //remove the total messages + msg.getMessage().trim(4); + //get the msg nr + int nr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4); + //remove the msg nr + msg.getMessage().trim(4); + frags[nr] = msg.getMessage(); + + } + + public boolean complete() { + boolean result = true; + for ( int i=0; (i<frags.length) && (result); i++ ) result = (frags[i] != null); + return result; + } + + public ChannelMessage assemble() { + if ( !complete() ) throw new IllegalStateException("Fragments are missing."); + int buffersize = 0; + for (int i=0; i<frags.length; i++ ) buffersize += frags[i].getLength(); + XByteBuffer buf = new XByteBuffer(buffersize,false); + msg.setMessage(buf); + for ( int i=0; i<frags.length; i++ ) { + msg.getMessage().append(frags[i].getBytesDirect(),0,frags[i].getLength()); + } + return msg; + } + + public boolean expired(long expire) { + return (System.currentTimeMillis()-received)>expire; + } + + + + } + + public static class FragKey { + private byte[] uniqueId; + private long received = System.currentTimeMillis(); + public FragKey(byte[] id ) { + this.uniqueId = id; + } + public int hashCode() { + return XByteBuffer.toInt(uniqueId,0); + } + + public boolean equals(Object o ) { + if ( o instanceof FragKey ) { + return Arrays.equals(uniqueId,((FragKey)o).uniqueId); + } else return false; + + } + + public boolean expired(long expire) { + return (System.currentTimeMillis()-received)>expire; + } + + } + } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381798&r1=381797&r2=381798&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Tue Feb 28 14:26:50 2006 @@ -331,6 +331,22 @@ ( ( ( (long) b[off+0]) & 0xFF) << 56); } + + /** + * Converts an integer to four bytes + * @param n - the integer + * @return - four bytes in an array + */ + public static byte[] toBytes(boolean bool) { + byte[] b = new byte[] {(byte)(bool?1:0)}; + return b; + } + + public static boolean toBoolean(byte[] b, int offset) { + return b[offset] != 0; + } + + /** * Converts an integer to four bytes * @param n - the integer --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]