Author: fhanik
Date: Tue Feb 28 14:26:50 2006
New Revision: 381798
URL: http://svn.apache.org/viewcvs?rev=381798&view=rev
Log:
Completed the fragmentation interceptor
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelInterceptor.java
Tue Feb 28 14:26:50 2006
@@ -26,7 +26,7 @@
* Abstract class for the interceptor base class.
* @author Filip Hanik
* @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul
2005) $
- */
+ */
public interface ChannelInterceptor extends MembershipListener {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelSender.java
Tue Feb 28 14:26:50 2006
@@ -22,7 +22,7 @@
* @author Filip Hanik
* @version $Revision: 303950 $, $Date: 2005-06-09 15:38:30 -0500 (Thu, 09 Jun
2005) $
*
- */
+ */
public interface ChannelSender
{
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
Tue Feb 28 14:26:50 2006
@@ -23,6 +23,7 @@
import org.apache.catalina.tribes.mcast.McastService;
import org.apache.catalina.tribes.group.interceptors.GzipInterceptor;
import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor;
/**
* <p>Title: </p>
@@ -55,7 +56,9 @@
.append("\n\t\t[-mdrop multicastdroptime]")
.append("\n\t\t[-gzip]")
.append("\n\t\t[-order]")
- .append("\n\t\t[-ordersize maxorderqueuesize]");
+ .append("\n\t\t[-ordersize maxorderqueuesize]")
+ .append("\n\t\t[-frag]")
+ .append("\n\t\t[-fragsize maxmsgsize]");
return buf;
}
@@ -78,6 +81,8 @@
long mcastdrop = 2000;
boolean order = false;
int ordersize = Integer.MAX_VALUE;
+ boolean frag = false;
+ int fragsize = 1024;
for (int i = 0; i < args.length; i++) {
if ("-bind".equals(args[i])) {
@@ -97,6 +102,11 @@
} else if ("-ordersize".equals(args[i])) {
ordersize = Integer.parseInt(args[++i]);
System.out.println("Setting
OrderInterceptor.maxQueue="+ordersize);
+ } else if ("-frag".equals(args[i])) {
+ frag = true;
+ } else if ("-fragsize".equals(args[i])) {
+ fragsize = Integer.parseInt(args[++i]);
+ System.out.println("Setting
FragmentationInterceptor.maxSize="+fragsize);
} else if ("-ack".equals(args[i])) {
ack = Boolean.parseBoolean(args[++i]);
} else if ("-ackto".equals(args[i])) {
@@ -146,6 +156,11 @@
channel.setMembershipService(service);
if (gzip) channel.addInterceptor(new GzipInterceptor());
+ if ( frag ) {
+ FragmentationInterceptor fi = new FragmentationInterceptor();
+ fi.setMaxSize(fragsize);
+ channel.addInterceptor(fi);
+ }
if (order) {
OrderInterceptor oi = new OrderInterceptor();
oi.setMaxQueue(ordersize);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
Tue Feb 28 14:26:50 2006
@@ -15,13 +15,200 @@
package org.apache.catalina.tribes.group.interceptors;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.InterceptorPayload;
+import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.io.XByteBuffer;
/**
*
- *
+ * The fragmentation interceptor splits up large messages into smaller
messages and assembles them on the other end.
+ * This is very useful when you don't want large messages hogging the sending
sockets
+ * and smaller messages can make it through.
+ *
+ * <br><b>Configuration Options</b><br>
+ * OrderInteceptor.expire=<milliseconds> - how long do we keep the fragments
in memory and wait for the rest to arrive<b>default=60,000ms -> 60seconds</b>
+ * This setting is useful to avoid OutOfMemoryErrors<br>
+ * OrderInteceptor.maxSize=<max message size> - message size in bytes
<b>default=1024*100 (around a tenth of a MB)</b><br>
* @author Filip Hanik
* @version 1.0
*/
public class FragmentationInterceptor extends ChannelInterceptorBase {
+ protected HashMap fragpieces = new HashMap();
+ private int maxSize = 1024*100;
+ private long expire = 1000 * 60; //one minute expiration
+
+
+ public void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws ChannelException {
+ int size = msg.getMessage().getLength();
+ boolean frag = (size>maxSize);
+ if ( frag ) {
+ frag(destination, msg, payload);
+ }
+ else {
+ byte[] flag = XByteBuffer.toBytes(frag);
+ msg.getMessage().append(flag,0,flag.length);
+ super.sendMessage(destination, msg, payload);
+ }
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ boolean isFrag =
XByteBuffer.toBoolean(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-1);
+ msg.getMessage().trim(1);
+ if ( isFrag ) {
+ defrag(msg);
+ } else {
+ super.messageReceived(msg);
+ }
+ }
+
+
+ public synchronized FragCollection getFragCollection(FragKey key,
ChannelMessage msg) {
+ FragCollection coll = (FragCollection)fragpieces.get(key);
+ if ( coll == null ) {
+ coll = new FragCollection(msg);
+ fragpieces.put(key,coll);
+ }
+ return coll;
+ }
+
+ public synchronized void removeFragCollection(FragKey key) {
+ fragpieces.remove(key);
+ }
+
+ public void defrag(ChannelMessage msg ) {
+ FragKey key = new FragKey(msg.getUniqueId());
+ FragCollection coll = getFragCollection(key,msg);
+ coll.addMessage(msg);
+
+ if ( coll.complete() ) {
+ removeFragCollection(key);
+ ChannelMessage complete = coll.assemble();
+ super.messageReceived(complete);
+
+ }
+ }
+
+ public void frag(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws ChannelException {
+ int size = msg.getMessage().getLength();
+
+ int count = ((size / maxSize )+(size%maxSize==0?0:1));
+ ChannelMessage[] messages = new ChannelMessage[count];
+ int remaining = size;
+ for ( int i=0; i<count; i++ ) {
+ ChannelMessage tmp = msg.clone();
+ int offset = (i*maxSize);
+ int length = Math.min(remaining,maxSize);
+ tmp.getMessage().clear();
+
tmp.getMessage().append(msg.getMessage().getBytesDirect(),offset,length);
+ //add the msg nr
+ tmp.getMessage().append(XByteBuffer.toBytes(i),0,4);
+ //add the total nr of messages
+ tmp.getMessage().append(XByteBuffer.toBytes(count),0,4);
+ //add true as the frag flag
+ byte[] flag = XByteBuffer.toBytes(true);
+ tmp.getMessage().append(flag,0,flag.length);
+ messages[i] = tmp;
+ remaining -= length;
+
+ }
+ for ( int i=0; i<messages.length; i++ ) {
+ super.sendMessage(destination,messages[i],payload);
+ }
+ }
+
+
+
+ public int getMaxSize() {
+ return maxSize;
+ }
+
+ public long getExpire() {
+ return expire;
+ }
+
+ public void setMaxSize(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ public void setExpire(long expire) {
+ this.expire = expire;
+ }
+
+ public static class FragCollection {
+ private long received = System.currentTimeMillis();
+ private ChannelMessage msg;
+ private XByteBuffer[] frags;
+ public FragCollection(ChannelMessage msg) {
+ //get the total messages
+ int count =
XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+ frags = new XByteBuffer[count];
+ this.msg = msg;
+ }
+
+ public void addMessage(ChannelMessage msg) {
+ //remove the total messages
+ msg.getMessage().trim(4);
+ //get the msg nr
+ int nr =
XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+ //remove the msg nr
+ msg.getMessage().trim(4);
+ frags[nr] = msg.getMessage();
+
+ }
+
+ public boolean complete() {
+ boolean result = true;
+ for ( int i=0; (i<frags.length) && (result); i++ ) result =
(frags[i] != null);
+ return result;
+ }
+
+ public ChannelMessage assemble() {
+ if ( !complete() ) throw new IllegalStateException("Fragments are
missing.");
+ int buffersize = 0;
+ for (int i=0; i<frags.length; i++ ) buffersize +=
frags[i].getLength();
+ XByteBuffer buf = new XByteBuffer(buffersize,false);
+ msg.setMessage(buf);
+ for ( int i=0; i<frags.length; i++ ) {
+
msg.getMessage().append(frags[i].getBytesDirect(),0,frags[i].getLength());
+ }
+ return msg;
+ }
+
+ public boolean expired(long expire) {
+ return (System.currentTimeMillis()-received)>expire;
+ }
+
+
+
+ }
+
+ public static class FragKey {
+ private byte[] uniqueId;
+ private long received = System.currentTimeMillis();
+ public FragKey(byte[] id ) {
+ this.uniqueId = id;
+ }
+ public int hashCode() {
+ return XByteBuffer.toInt(uniqueId,0);
+ }
+
+ public boolean equals(Object o ) {
+ if ( o instanceof FragKey ) {
+ return Arrays.equals(uniqueId,((FragKey)o).uniqueId);
+ } else return false;
+
+ }
+
+ public boolean expired(long expire) {
+ return (System.currentTimeMillis()-received)>expire;
+ }
+
+ }
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381798&r1=381797&r2=381798&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
Tue Feb 28 14:26:50 2006
@@ -331,6 +331,22 @@
( ( ( (long) b[off+0]) & 0xFF) << 56);
}
+
+ /**
+ * Converts an integer to four bytes
+ * @param n - the integer
+ * @return - four bytes in an array
+ */
+ public static byte[] toBytes(boolean bool) {
+ byte[] b = new byte[] {(byte)(bool?1:0)};
+ return b;
+ }
+
+ public static boolean toBoolean(byte[] b, int offset) {
+ return b[offset] != 0;
+ }
+
+
/**
* Converts an integer to four bytes
* @param n - the integer
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]