Author: fhanik
Date: Tue Feb 28 08:51:07 2006
New Revision: 381703

URL: http://svn.apache.org/viewcvs?rev=381703&view=rev
Log:
Fixed buffer usage

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=381703&r1=381702&r2=381703&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/ChannelCreator.java
 Tue Feb 28 08:51:07 2006
@@ -22,6 +22,7 @@
 import org.apache.catalina.tribes.group.GroupChannel;
 import org.apache.catalina.tribes.mcast.McastService;
 import org.apache.catalina.tribes.group.interceptors.GzipInterceptor;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
 
 /**
  * <p>Title: </p>
@@ -53,6 +54,7 @@
            .append("\n\t\t[-mfreq multicastfrequency]")
            .append("\n\t\t[-mdrop multicastdroptime]")
            .append("\n\t\t[-gzip]")
+           .append("\n\t\t[-order]")
            ;
        return buf;
 
@@ -74,6 +76,7 @@
         int mcastport = 45565;
         long mcastfreq = 500;
         long mcastdrop = 2000;
+        boolean order = false;
         
         for (int i = 0; i < args.length; i++) {
             if ("-bind".equals(args[i])) {
@@ -88,6 +91,8 @@
                 tcpthreadcount = Integer.parseInt(args[++i]);
             } else if ("-gzip".equals(args[i])) {
                 gzip = true;
+            } else if ("-order".equals(args[i])) {
+                order = true;
             } else if ("-ack".equals(args[i])) {
                 ack = Boolean.parseBoolean(args[++i]);
             } else if ("-ackto".equals(args[i])) {
@@ -137,6 +142,7 @@
         channel.setMembershipService(service);
 
         if (gzip) channel.addInterceptor(new GzipInterceptor());
+        if (order) channel.addInterceptor(new OrderInterceptor());
         return channel;
         
     }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381703&r1=381702&r2=381703&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 Tue Feb 28 08:51:07 2006
@@ -121,12 +121,13 @@
     public void run() {
         
         long counter = 0;
+        long total = 0;
         LoadMessage msg = new LoadMessage();
         int messageSize = LoadTest.messageSize;
         
         try {
             startTest();
-            while (counter < msgCount) {
+            while (total < msgCount) {
                 if (channel.getMembers().length == 0 || (!send)) {
                     synchronized (mutex) {
                         try {
@@ -137,7 +138,8 @@
                     }
                 } else {
                     try {
-                        msg.setMsgNr((int)++counter);
+                        msg.setMsgNr((int)++total);
+                        counter++;
                         if (debug) {
                             printArray(msg.getMessage());
                         }
@@ -334,6 +336,7 @@
                 threads = Integer.parseInt(args[++i]);
             } else if ("-count".equals(args[i])) {
                 count = Integer.parseInt(args[++i]);
+                System.out.println("Sending "+count+" messages.");
             } else if ("-pause".equals(args[i])) {
                 pause = Long.parseLong(args[++i])*1000;
             } else if ("-break".equals(args[i])) {
@@ -345,7 +348,7 @@
                 if ( "receive".equals(args[++i]) ) send = false;
             } else if ("-debug".equals(args[i])) {
                 debug = true;
-            } else //("-help".equals(args[i])) 
+            } else if ("-help".equals(args[i])) 
             {
                 usage();
                 System.exit(1);
@@ -357,6 +360,7 @@
         
         LoadTest test = new 
LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
         LoadMessage msg = new LoadMessage();
+        
         messageSize = LoadMessage.getMessageSize(msg);
         channel.setChannelListener(test);
         channel.setMembershipListener(test);
@@ -383,6 +387,9 @@
         
         public void run() {
             System.out.println("Shutting down...");
+            SystemExit exit = new SystemExit(5000);
+            exit.setDaemon(true);
+            exit.start();
             try {
                 channel.stop(channel.DEFAULT);
                 
@@ -390,6 +397,21 @@
                 x.printStackTrace();
             }
             System.out.println("Channel stopped.");
+        }
+    }
+    public static class SystemExit extends Thread {
+        private long delay;
+        public SystemExit(long delay) {
+            this.delay = delay;
+        }
+        public void run () {
+            try {
+                Thread.sleep(delay);
+            }catch ( Exception x ) {
+                x.printStackTrace();
+            }
+            System.exit(0);
+
         }
     }
     

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381703&r1=381702&r2=381703&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 Tue Feb 28 08:51:07 2006
@@ -91,6 +91,7 @@
     public void send(Member[] destination, Serializable msg) throws 
ChannelException {
         if ( msg == null ) return;
         try {
+            if ( destination == null ) destination = getMembers();
             int options = 0;
             ClusterData data = new ClusterData();//generates a unique Id
             data.setAddress(getLocalMember());
@@ -103,7 +104,7 @@
                 b = XByteBuffer.serialize(msg);
             }
             data.setOptions(options);
-            XByteBuffer buffer = new XByteBuffer(b.length+128);
+            XByteBuffer buffer = new XByteBuffer(b.length+128,false);
             buffer.append(b,0,b.length);
             data.setMessage(buffer);
             getFirstInterceptor().sendMessage(destination, data, null);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381703&r1=381702&r2=381703&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 Tue Feb 28 08:51:07 2006
@@ -37,7 +37,7 @@
     private HashMap incounter = new HashMap();
     private HashMap incoming = new HashMap();
     private long expire = 3000;
-    private boolean forwardExpired;
+    private boolean forwardExpired = true;
 
     public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
         for ( int i=0; i<destination.length; i++ ) {
@@ -77,6 +77,7 @@
         while ( tmp != null ) {
             //process expired messages
             if ( tmp.isExpired(expire) ) {
+                System.out.println("Found expired message");
                 //reset the head
                 if ( tmp == head ) head = tmp.next;
                 if ( getForwardExpired() ) 
super.messageReceived(tmp.getMessage());

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=381703&r1=381702&r2=381703&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
 Tue Feb 28 08:51:07 2006
@@ -198,7 +198,7 @@
         System.arraycopy(b,offset,addr,0,addr.length);
         data.setAddress(McastMember.getMember(addr));
         offset += addr.length; //addr data
-        data.message = new XByteBuffer(new byte[XByteBuffer.toInt(b,offset)]);
+        data.message = new XByteBuffer(new 
byte[XByteBuffer.toInt(b,offset)],false);
         offset += 4; //message length
         
System.arraycopy(b,offset,data.message.getBytesDirect(),0,data.message.getLength());
         offset += data.message.getLength(); //message data

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=381703&r1=381702&r2=381703&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
 Tue Feb 28 08:51:07 2006
@@ -52,7 +52,7 @@
     public ObjectReader(SocketChannel channel, Selector selector, 
ListenCallback callback) {
         this.channel = channel;
         this.callback = callback;
-        this.buffer = new XByteBuffer();
+        this.buffer = new XByteBuffer(true);
     }
 
     /**

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381703&r1=381702&r2=381703&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Tue Feb 28 08:51:07 2006
@@ -85,30 +85,38 @@
     protected int bufSize = 0;
     
     /**
+     * 
+     * 
+     */
+    protected boolean discard = true;
+    
+    /**
      * Constructs a new XByteBuffer
      * @param size - the initial size of the byte buffer
      */
-    public XByteBuffer(int size) {
+    public XByteBuffer(int size, boolean discard) {
         buf = new byte[size];
+        this.discard = discard;
     }
     
-    public XByteBuffer(byte[] data) {
-        this(data,data.length+128);
+    public XByteBuffer(byte[] data,boolean discard) {
+        this(data,data.length+128,discard);
     }
     
-    public XByteBuffer(byte[] data, int size) {
+    public XByteBuffer(byte[] data, int size,boolean discard) {
         int length = Math.max(data.length,size);
         buf = new byte[length];
         System.arraycopy(data,0,buf,0,data.length);
         bufSize = data.length;
+        this.discard = discard;
     }
 
 
     /**
      * Constructs a new XByteBuffer with an initial size of 1024 bytes
      */
-    public XByteBuffer()  {
-        this(DEF_SIZE);
+    public XByteBuffer(boolean discard)  {
+        this(DEF_SIZE,discard);
     }
     
     public int getLength() {
@@ -160,11 +168,13 @@
         b.get(buf,bufSize,len);
         
         bufSize = newcount;
-
-        if (bufSize > START_DATA.length && 
(firstIndexOf(buf,0,START_DATA)==-1)){
-            bufSize = 0;
-            log.error("Discarded the package, invalid header");
-            return false;
+        
+        if ( discard ) {
+            if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, 
START_DATA) == -1)) {
+                bufSize = 0;
+                log.error("Discarded the package, invalid header");
+                return false;
+            }
         }
         return true;
 
@@ -188,10 +198,12 @@
         System.arraycopy(b, off, buf, bufSize, len);
         bufSize = newcount;
 
-        if (bufSize > START_DATA.length && 
(firstIndexOf(buf,0,START_DATA)==-1)){
-            bufSize = 0;
-            log.error("Discarded the package, invalid header");
-            return false;
+        if ( discard ) {
+            if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, 
START_DATA) == -1)) {
+                bufSize = 0;
+                log.error("Discarded the package, invalid header");
+                return false;
+            }
         }
         return true;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to