Author: fhanik
Date: Tue Feb 28 14:26:50 2006
New Revision: 381798

URL: http://svn.apache.org/viewcvs?rev=381798&view=rev
Log:
Completed the fragmentation interceptor

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
 Tue Feb 28 14:26:50 2006
@@ -26,7 +26,7 @@
  * Abstract class for the interceptor base class.
  * @author Filip Hanik
  * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 
2005) $
- */
+ */   
 
 public interface ChannelInterceptor extends MembershipListener {
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
 Tue Feb 28 14:26:50 2006
@@ -22,7 +22,7 @@
  * @author Filip Hanik
  * @version $Revision: 303950 $, $Date: 2005-06-09 15:38:30 -0500 (Thu, 09 Jun 
2005) $
  *
- */
+ */    
 public interface ChannelSender
 {
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
 Tue Feb 28 14:26:50 2006
@@ -23,6 +23,7 @@
 import org.apache.catalina.tribes.mcast.McastService;
 import org.apache.catalina.tribes.group.interceptors.GzipInterceptor;
 import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor;
 
 /**
  * <p>Title: </p>
@@ -55,7 +56,9 @@
            .append("\n\t\t[-mdrop multicastdroptime]")
            .append("\n\t\t[-gzip]")
            .append("\n\t\t[-order]")
-           .append("\n\t\t[-ordersize maxorderqueuesize]");
+           .append("\n\t\t[-ordersize maxorderqueuesize]")
+           .append("\n\t\t[-frag]")
+           .append("\n\t\t[-fragsize maxmsgsize]");
        return buf;
 
     }
@@ -78,6 +81,8 @@
         long mcastdrop = 2000;
         boolean order = false;
         int ordersize = Integer.MAX_VALUE;
+        boolean frag = false;
+        int fragsize = 1024;
         
         for (int i = 0; i < args.length; i++) {
             if ("-bind".equals(args[i])) {
@@ -97,6 +102,11 @@
             } else if ("-ordersize".equals(args[i])) {
                 ordersize = Integer.parseInt(args[++i]);
                 System.out.println("Setting 
OrderInterceptor.maxQueue="+ordersize);
+            } else if ("-frag".equals(args[i])) {
+                frag = true;
+            } else if ("-fragsize".equals(args[i])) {
+                fragsize = Integer.parseInt(args[++i]);
+                System.out.println("Setting 
FragmentationInterceptor.maxSize="+fragsize);
             } else if ("-ack".equals(args[i])) {
                 ack = Boolean.parseBoolean(args[++i]);
             } else if ("-ackto".equals(args[i])) {
@@ -146,6 +156,11 @@
         channel.setMembershipService(service);
 
         if (gzip) channel.addInterceptor(new GzipInterceptor());
+        if ( frag ) {
+            FragmentationInterceptor fi = new FragmentationInterceptor();
+            fi.setMaxSize(fragsize);
+            channel.addInterceptor(fi);
+        }
         if (order) {
             OrderInterceptor oi = new OrderInterceptor();
             oi.setMaxQueue(ordersize);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
 Tue Feb 28 14:26:50 2006
@@ -15,13 +15,200 @@
 
 package org.apache.catalina.tribes.group.interceptors;
 
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.InterceptorPayload;
+import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.io.XByteBuffer;
 
 /**
  *
- *
+ * The fragmentation interceptor splits up large messages into smaller 
messages and assembles them on the other end.
+ * This is very useful when you don't want large messages hogging the sending 
sockets
+ * and smaller messages can make it through.
+ * 
+ * <br><b>Configuration Options</b><br>
+ * OrderInteceptor.expire=<milliseconds> - how long do we keep the fragments 
in memory and wait for the rest to arrive<b>default=60,000ms -> 60seconds</b>
+ * This setting is useful to avoid OutOfMemoryErrors<br>
+ * OrderInteceptor.maxSize=<max message size> - message size in bytes 
<b>default=1024*100 (around a tenth of a MB)</b><br>
  * @author Filip Hanik
  * @version 1.0
  */
 public class FragmentationInterceptor extends ChannelInterceptorBase {
+    protected HashMap fragpieces = new HashMap();
+    private int maxSize = 1024*100;
+    private long expire = 1000 * 60; //one minute expiration
+
+
+    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
+        int size = msg.getMessage().getLength();
+        boolean frag = (size>maxSize);
+        if ( frag ) {
+            frag(destination, msg, payload);
+        }
+        else {
+            byte[] flag = XByteBuffer.toBytes(frag);
+            msg.getMessage().append(flag,0,flag.length);
+            super.sendMessage(destination, msg, payload);
+        }
+    }
+    
+    public void messageReceived(ChannelMessage msg) {
+        boolean isFrag = 
XByteBuffer.toBoolean(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-1);
+        msg.getMessage().trim(1);
+        if ( isFrag ) {
+            defrag(msg);
+        } else {
+            super.messageReceived(msg);
+        }
+    }
+
+    
+    public synchronized FragCollection getFragCollection(FragKey key, 
ChannelMessage msg) {
+        FragCollection coll = (FragCollection)fragpieces.get(key);
+        if ( coll == null ) {
+            coll = new FragCollection(msg);
+            fragpieces.put(key,coll);
+        } 
+        return coll;
+    }
+    
+    public synchronized void removeFragCollection(FragKey key) {
+        fragpieces.remove(key);
+    }
+    
+    public void defrag(ChannelMessage msg ) { 
+        FragKey key = new FragKey(msg.getUniqueId());
+        FragCollection coll = getFragCollection(key,msg);
+        coll.addMessage(msg);
+
+        if ( coll.complete() ) {
+            removeFragCollection(key);
+            ChannelMessage complete = coll.assemble();
+            super.messageReceived(complete);
+            
+        }
+    }
+
+    public void frag(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
+        int size = msg.getMessage().getLength();
+
+        int count = ((size / maxSize )+(size%maxSize==0?0:1));
+        ChannelMessage[] messages = new ChannelMessage[count];
+        int remaining = size;
+        for ( int i=0; i<count; i++ ) {
+            ChannelMessage tmp = msg.clone();
+            int offset = (i*maxSize);
+            int length = Math.min(remaining,maxSize);
+            tmp.getMessage().clear();
+            
tmp.getMessage().append(msg.getMessage().getBytesDirect(),offset,length);
+            //add the msg nr
+            tmp.getMessage().append(XByteBuffer.toBytes(i),0,4);
+            //add the total nr of messages
+            tmp.getMessage().append(XByteBuffer.toBytes(count),0,4);
+            //add true as the frag flag
+            byte[] flag = XByteBuffer.toBytes(true);
+            tmp.getMessage().append(flag,0,flag.length);
+            messages[i] = tmp;
+            remaining -= length;
+            
+        }
+        for ( int i=0; i<messages.length; i++ ) {
+            super.sendMessage(destination,messages[i],payload);
+        }
+    }
+
+    
+
+    public int getMaxSize() {
+        return maxSize;
+    }
+
+    public long getExpire() {
+        return expire;
+    }
+
+    public void setMaxSize(int maxSize) {
+        this.maxSize = maxSize;
+    }
+
+    public void setExpire(long expire) {
+        this.expire = expire;
+    }
+
+    public static class FragCollection {
+        private long received = System.currentTimeMillis();
+        private ChannelMessage msg;
+        private XByteBuffer[] frags;
+        public FragCollection(ChannelMessage msg) {
+            //get the total messages
+            int count = 
XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+            frags = new XByteBuffer[count];
+            this.msg = msg;
+        }
+        
+        public void addMessage(ChannelMessage msg) {
+            //remove the total messages
+            msg.getMessage().trim(4);
+            //get the msg nr
+            int nr = 
XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+            //remove the msg nr
+            msg.getMessage().trim(4);
+            frags[nr] = msg.getMessage();
+            
+        }
+        
+        public boolean complete() {
+            boolean result = true;
+            for ( int i=0; (i<frags.length) && (result); i++ ) result = 
(frags[i] != null);
+            return result;
+        }
+        
+        public ChannelMessage assemble() {
+            if ( !complete() ) throw new IllegalStateException("Fragments are 
missing.");
+            int buffersize = 0;
+            for (int i=0; i<frags.length; i++ ) buffersize += 
frags[i].getLength();
+            XByteBuffer buf = new XByteBuffer(buffersize,false);
+            msg.setMessage(buf);
+            for ( int i=0; i<frags.length; i++ ) {
+                
msg.getMessage().append(frags[i].getBytesDirect(),0,frags[i].getLength());
+            }
+            return msg;
+        }
+        
+        public boolean expired(long expire) {
+            return (System.currentTimeMillis()-received)>expire;
+        }
+
+        
+        
+    }
+    
+    public static class FragKey {
+        private byte[] uniqueId;
+        private long received = System.currentTimeMillis();
+        public FragKey(byte[] id ) {
+            this.uniqueId = id;
+        }
+        public int hashCode() {
+            return XByteBuffer.toInt(uniqueId,0);
+        }
+        
+        public boolean equals(Object o ) {
+            if ( o instanceof FragKey ) {
+            return Arrays.equals(uniqueId,((FragKey)o).uniqueId);
+        } else return false;
+
+        }
+        
+        public boolean expired(long expire) {
+            return (System.currentTimeMillis()-received)>expire;
+        }
+
+    }
+    
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Tue Feb 28 14:26:50 2006
@@ -331,6 +331,22 @@
             ( ( ( (long) b[off+0]) & 0xFF) << 56);
     }
 
+    
+    /**
+     * Converts an integer to four bytes
+     * @param n - the integer
+     * @return - four bytes in an array
+     */
+    public static byte[] toBytes(boolean bool) {
+        byte[] b = new byte[] {(byte)(bool?1:0)};
+        return b;
+    }
+    
+    public static boolean toBoolean(byte[] b, int offset) {
+        return b[offset] != 0;
+    }
+
+    
     /**
      * Converts an integer to four bytes
      * @param n - the integer



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to