Author: fhanik
Date: Tue Feb 28 08:14:20 2006
New Revision: 381687

URL: http://svn.apache.org/viewcvs?rev=381687&view=rev
Log:
Added order protocol, created a command line channel creator

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
Removed:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.jbx
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
 Tue Feb 28 08:14:20 2006
@@ -16,6 +16,7 @@
 package org.apache.catalina.tribes;
 
 import java.io.Serializable;
+import org.apache.catalina.tribes.io.XByteBuffer;
 
 /**
  * @author Filip Hanik
@@ -61,11 +62,13 @@
      */
     public byte[] getUniqueId();
     
-    public void setMessage(byte[] data);
+    public void setMessage(XByteBuffer buf);
     
-    public byte[] getMessage();
+    public XByteBuffer getMessage();
     
     public int getOptions();
     public void setOptions(int options);
+    
+    public ChannelMessage clone();
 
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
 Tue Feb 28 08:14:20 2006
@@ -69,6 +69,8 @@
      */
     public boolean hasMembers();
     
+    
+    public Member getMember(Member mbr);
     /**
      * Returns a list of all the members in the cluster.
      */

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 Tue Feb 28 08:14:20 2006
@@ -1,3 +1,18 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.catalina.tribes.demos;
 
 import org.apache.catalina.tribes.Member;
@@ -288,20 +303,15 @@
         System.out.println("Usage:\n\t"+
                            "java LoadTest [options]\n\t"+
                            "Options:\n\t\t"+
-                           "[-bind tcpbindaddress] \n\t\t"+
-                           "[-port tcplistenport]  \n\t\t"+
-                           "[-mbind multicastbindaddr]  \n\t\t"+
                            "[-mode receive|send|both]  \n\t\t"+
                            "[-debug]  \n\t\t"+
                            "[-count messagecount]  \n\t\t"+
                            "[-stats statinterval]  \n\t\t"+
-                           "[-ack true|false]  \n\t\t"+
-                           "[-sync true|false]  \n\t\t"+
-                           "[-gzip]  \n\t\t"+
                            "[-pause nrofsecondstopausebetweensends]  \n\t\t"+
-                           "[-sender pooled|fastasyncqueue]  \n\t\t"+
                            "[-threads numberofsenderthreads]  \n\t\t"+
                            "[-break (halts execution on exception)]\n"+
+                           "\tChannel options:"+
+                           ChannelCreator.usage()+"\n\n"+
                            "Example:\n\t"+
                            "java LoadTest -port 4004\n\t"+
                            "java LoadTest -bind 192.168.0.45 -port 4005\n\t"+
@@ -309,49 +319,28 @@
     }
     
     public static void main(String[] args) throws Exception {
-        String bind  = "auto";
-        int port = 4001;
         boolean send = true;
-        String mbind = null;
         boolean debug = false;
-        boolean ack = true;
-        boolean sync = true;
-        boolean gzip = false;
         long pause = 0;
         int count = 1000000;
         int stats = 10000;
         boolean breakOnEx = false;
         int threads = 1;
-        String sender = "pooled";
         if ( args.length == 0 ) {
             args = new String[] {"-help"};
         }
         for (int i = 0; i < args.length; i++) {
-            if ("-bind".equals(args[i])) {
-                bind = args[++i];
-            } else if ("-sender".equals(args[i])) {
-                sender = args[++i];
-            } else if ("-port".equals(args[i])) {
-                port = Integer.parseInt(args[++i]);
-            } else if ("-threads".equals(args[i])) {
+            if ("-threads".equals(args[i])) {
                 threads = Integer.parseInt(args[++i]);
             } else if ("-count".equals(args[i])) {
                 count = Integer.parseInt(args[++i]);
             } else if ("-pause".equals(args[i])) {
                 pause = Long.parseLong(args[++i])*1000;
-            } else if ("-gzip".equals(args[i])) {
-                gzip = true;
             } else if ("-break".equals(args[i])) {
                 breakOnEx = true;
-            } else if ("-ack".equals(args[i])) {
-                ack = Boolean.parseBoolean(args[++i]);
-            } else if ("-sync".equals(args[i])) {
-                sync = Boolean.parseBoolean(args[++i]);
             } else if ("-stats".equals(args[i])) {
                 stats = Integer.parseInt(args[++i]);
                 System.out.println("Stats every "+stats+" message");
-            } else if ("-mbind".equals(args[i])) {
-                mbind = args[++i];
             } else if ("-mode".equals(args[i])) {
                 if ( "receive".equals(args[++i]) ) send = false;
             } else if ("-debug".equals(args[i])) {
@@ -364,34 +353,7 @@
         }
         
         
-        ReplicationListener rl = new ReplicationListener();
-        rl.setTcpListenAddress(bind);
-        rl.setTcpListenPort(port);
-        rl.setTcpSelectorTimeout(100);
-        rl.setTcpThreadCount(4);
-        rl.getBind();
-        rl.setSendAck(ack);
-        rl.setSynchronized(sync);
-
-        ReplicationTransmitter ps = new ReplicationTransmitter();
-        ps.setReplicationMode(sender);
-        ps.setAckTimeout(15000);
-        ps.setAutoConnect(true);
-        ps.setWaitForAck(ack);
-
-        McastService service = new McastService();
-        service.setMcastAddr("228.0.0.5");
-        if ( mbind != null ) service.setMcastBindAddress(mbind);
-        service.setMcastFrequency(500);
-        service.setMcastDropTime(2000);
-        service.setMcastPort(45565);
-
-        ManagedChannel channel = new GroupChannel();
-        channel.setChannelReceiver(rl);
-        channel.setChannelSender(ps);
-        channel.setMembershipService(service);
-        
-        if ( gzip ) channel.addInterceptor(new GzipInterceptor());
+        ManagedChannel channel = 
(ManagedChannel)ChannelCreator.createChannel(args);
         
         LoadTest test = new 
LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
         LoadMessage msg = new LoadMessage();

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
 Tue Feb 28 08:14:20 2006
@@ -22,9 +22,7 @@
 import org.apache.catalina.tribes.ChannelSender;
 import org.apache.catalina.tribes.ChannelReceiver;
 import org.apache.catalina.tribes.Channel;
-import java.io.IOException;
 import org.apache.catalina.tribes.InterceptorPayload;
-import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.MessageListener;
 
 

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
 Tue Feb 28 08:14:20 2006
@@ -103,8 +103,9 @@
                 b = XByteBuffer.serialize(msg);
             }
             data.setOptions(options);
-            
-            data.setMessage(b);
+            XByteBuffer buffer = new XByteBuffer(b.length+128);
+            buffer.append(b,0,b.length);
+            data.setMessage(buffer);
             getFirstInterceptor().sendMessage(destination, data, null);
         }catch ( Exception x ) {
             if ( x instanceof ChannelException ) throw (ChannelException)x;
@@ -118,9 +119,9 @@
             
             Serializable fwd = null;
             if ( (msg.getOptions() & BYTE_MESSAGE) == BYTE_MESSAGE ) {
-                fwd = new ByteMessage(msg.getMessage());
+                fwd = new ByteMessage(msg.getMessage().getBytes());
             } else {
-                fwd = XByteBuffer.deserialize(msg.getMessage());
+                fwd = XByteBuffer.deserialize(msg.getMessage().getBytes());
             }
             if ( channelListener != null && 
channelListener.accept(fwd,msg.getAddress())) 
                 channelListener.messageReceived(fwd,msg.getAddress());

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
 Tue Feb 28 08:14:20 2006
@@ -41,7 +41,9 @@
     
     public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
         try {
-            msg.setMessage(compress(msg.getMessage()));
+            byte[] data = compress(msg.getMessage().getBytes());
+            msg.getMessage().trim(msg.getMessage().getLength());
+            msg.getMessage().append(data,0,data.length);
             getNext().sendMessage(destination, msg, payload);
         } catch ( IOException x ) {
             log.error("Unable to compress byte contents");
@@ -51,7 +53,9 @@
 
     public void messageReceived(ChannelMessage msg) {
         try {
-            msg.setMessage(decompress(msg.getMessage()));
+            byte[] data = decompress(msg.getMessage().getBytes());
+            msg.getMessage().trim(msg.getMessage().getLength());
+            msg.getMessage().append(data,0,data.length);
             getPrevious().messageReceived(msg);
         } catch ( IOException x ) {
             log.error("Unable to decompress byte contents",x);

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381687&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 Tue Feb 28 08:14:20 2006
@@ -0,0 +1,231 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.InterceptorPayload;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.io.XByteBuffer;
+
+
+
+/**
+ *
+ * 
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class OrderInterceptor extends ChannelInterceptorBase {
+    private HashMap outcounter = new HashMap();
+    private HashMap incounter = new HashMap();
+    private HashMap incoming = new HashMap();
+    private long expire = 3000;
+    private boolean forwardExpired;
+
+    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
+        for ( int i=0; i<destination.length; i++ ) {
+            ChannelMessage tmp = msg.clone();
+            int nr = incCounter(destination[i]);
+            tmp.getMessage().append(XByteBuffer.toBytes(nr),0,4);
+            getNext().sendMessage(new Member[] {destination[i]}, tmp, payload);
+        }
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        int msgnr = 
XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+        msg.getMessage().trim(4);
+        MessageOrder order = new MessageOrder(msgnr,msg);
+        processIncoming(order);
+        //getPrevious().messageReceived(msg);
+    }
+    
+    public synchronized void processIncoming(MessageOrder order) {
+        Member member = order.getMessage().getAddress();
+        Counter cnt = getInCounter(member);
+        
+        MessageOrder tmp = (MessageOrder)incoming.get(member);
+        if ( tmp != null ) {
+            order = MessageOrder.add(tmp,order);
+        }
+        while ( (order!=null) && (order.getMsgNr() == cnt.getCounter()) ) {
+            //we are right on target. process orders
+            cnt.inc();
+            super.messageReceived(order.getMessage());
+            order.setMessage(null);
+            order = order.next;
+        }
+        MessageOrder head = order;
+        MessageOrder prev = null;
+        tmp = order;
+        while ( tmp != null ) {
+            //process expired messages
+            if ( tmp.isExpired(expire) ) {
+                //reset the head
+                if ( tmp == head ) head = tmp.next;
+                if ( getForwardExpired() ) 
super.messageReceived(tmp.getMessage());
+                tmp.setMessage(null);
+                tmp = tmp.next;
+                if ( prev != null ) prev.next = tmp;
+            } else {
+                prev = tmp;
+            }
+        }
+        if ( head == null ) incoming.remove(member);
+        else incoming.put(member,head);
+    }
+    
+    public void memberAdded(Member member) {
+        //notify upwards
+        getInCounter(member);
+        getOutCounter(member);
+        super.memberAdded(member);
+    }
+
+    public void memberDisappeared(Member member) {
+        //notify upwards
+        outcounter.remove(member);
+        incounter.remove(member);
+        super.memberDisappeared(member);
+    }
+    
+    public int incCounter(Member mbr) { 
+        Counter cnt = getOutCounter(mbr);
+        return cnt.inc();
+    }
+    
+    public synchronized Counter getInCounter(Member mbr) {
+        Counter cnt = (Counter)incounter.get(mbr);
+        if ( cnt == null ) {
+            cnt = new Counter();
+            cnt.inc(); //always start at 1 for incoming
+            incounter.put(mbr,cnt);
+        }
+        return cnt;
+    }
+
+    public synchronized Counter getOutCounter(Member mbr) {
+        Counter cnt = (Counter)outcounter.get(mbr);
+        if ( cnt == null ) {
+            cnt = new Counter();
+            outcounter.put(mbr,cnt);
+        }
+        return cnt;
+    }
+
+    public static class Counter {
+        private int value = 0;
+        
+        public int getCounter() {
+            return value;
+        }
+        
+        public synchronized int inc() {
+            return ++value;
+        }
+    }
+    
+    public static class MessageOrder {
+        private long received = System.currentTimeMillis();
+        private MessageOrder next;
+        private int msgNr;
+        private ChannelMessage msg = null;
+        public MessageOrder(int msgNr,ChannelMessage msg) {
+            this.msgNr = msgNr;
+            this.msg = msg;
+        }
+        
+        public boolean isExpired(long expireTime) {
+            return (System.currentTimeMillis()-received) > expireTime;
+        }
+        
+        public ChannelMessage getMessage() {
+            return msg;
+        }
+        
+        public void setMessage(ChannelMessage msg) {
+            this.msg = msg;
+        }
+        
+        public void setNext(MessageOrder order) {
+            this.next = order;
+        }
+        public MessageOrder getNext() {
+            return next;
+        }
+        
+        public static MessageOrder add(MessageOrder head, MessageOrder add) {
+            if ( head == null ) return add;
+            if ( add == null ) return head;
+            if ( head == add ) return add;
+
+            if ( head.getMsgNr() > add.getMsgNr() ) {
+                //add before
+                MessageOrder tmp = add.next;
+                add.next = head;
+                head.next = tmp;
+                return add;
+            }
+            
+            MessageOrder iter = head;
+            MessageOrder prev = null;
+            while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {
+                prev = iter;
+                iter = iter.next;
+            }
+            if ( iter.getMsgNr() < add.getMsgNr() ) {
+                //add after
+                add.next = iter.next;
+                iter.next = add;
+            } else if (iter.getMsgNr() > add.getMsgNr()) {
+                //add before
+                prev.next = add;
+                add.next = iter;
+                
+            } else {
+                throw new ArithmeticException("Message added has the same 
counter, synchronization bug. Disable the order interceptor");
+            }
+            return head;
+        }
+        
+        public int getMsgNr() {
+            return msgNr;
+        }
+        
+        
+        
+    }
+
+    public void setExpire(long expire) {
+        this.expire = expire;
+    }
+
+    public void setForwardExpired(boolean forwardExpired) {
+        this.forwardExpired = forwardExpired;
+    }
+
+    public long getExpire() {
+        return expire;
+    }
+
+    public boolean getForwardExpired() {
+        return forwardExpired;
+    }
+
+}

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
 Tue Feb 28 08:14:20 2006
@@ -39,7 +39,7 @@
 public class ClusterData implements ChannelMessage {
 
     private int options = 0 ;
-    private byte[] message ;
+    private XByteBuffer message ;
     private long timestamp ;
     private byte[] uniqueId ;
     private Member address;
@@ -60,7 +60,7 @@
      * @param message message data
      * @param timestamp message creation date
      */
-    public ClusterData(byte[] uniqueId, byte[] message, long timestamp) {
+    public ClusterData(byte[] uniqueId, XByteBuffer message, long timestamp) {
         this.uniqueId = uniqueId;
         this.message = message;
         this.timestamp = timestamp;
@@ -69,13 +69,13 @@
     /**
      * @return Returns the message.
      */
-    public byte[] getMessage() {
+    public XByteBuffer getMessage() {
         return message;
     }
     /**
      * @param message The message to set.
      */
-    public void setMessage(byte[] message) {
+    public void setMessage(XByteBuffer message) {
         this.message = message;
     }
     /**
@@ -150,7 +150,7 @@
 
      * @return byte[]
      */
-    public byte[] getDataPackage() throws IOException {
+    public byte[] getDataPackage()  {
         byte[] addr = ((McastMember)address).getData();
         int length = 
             4 + //options
@@ -160,7 +160,7 @@
             4 + //addr length off=12+uniqueId.length+4
             addr.length+ //member data off=12+uniqueId.length+4+add.length
             4 + //message length off=12+uniqueId.length+4+add.length+4
-            message.length;
+            message.getLength();
         byte[] data = new byte[length];
         int offset = 0;
         XByteBuffer.toBytes(options,data,offset);
@@ -175,14 +175,14 @@
         offset += 4; //addr.length
         System.arraycopy(addr,0,data,offset,addr.length);
         offset += addr.length; //addr data
-        XByteBuffer.toBytes(message.length,data,offset);
+        XByteBuffer.toBytes(message.getLength(),data,offset);
         offset += 4; //message.length
-        System.arraycopy(message,0,data,offset,message.length);
-        offset += message.length; //message data
+        
System.arraycopy(message.getBytesDirect(),0,data,offset,message.getLength());
+        offset += message.getLength(); //message data
         return data;
     }
     
-    public static ClusterData getDataFromPackage(byte[] b) throws IOException {
+    public static ClusterData getDataFromPackage(byte[] b)  {
         ClusterData data = new ClusterData(false);
         int offset = 0;
         data.setOptions(XByteBuffer.toInt(b,offset));
@@ -198,30 +198,26 @@
         System.arraycopy(b,offset,addr,0,addr.length);
         data.setAddress(McastMember.getMember(addr));
         offset += addr.length; //addr data
-        data.message = new byte[XByteBuffer.toInt(b,offset)];
+        data.message = new XByteBuffer(new byte[XByteBuffer.toInt(b,offset)]);
         offset += 4; //message length
-        System.arraycopy(b,offset,data.message,0,data.message.length);
-        offset += data.message.length; //message data
+        
System.arraycopy(b,offset,data.message.getBytesDirect(),0,data.message.getLength());
+        offset += data.message.getLength(); //message data
         return data;
     }
     
-    public static void main(String[] args) throws Exception {
-        ClusterData data1 = new ClusterData();
-        data1.setAddress(new 
McastMember("domain","127.0.0.1",1000,System.currentTimeMillis()));
-        data1.setMessage(new byte[1024]);
-        
-        byte[] b = data1.getDataPackage();
-        
-        ClusterData data2 = ClusterData.getDataFromPackage(b);
-        
-        if ( !(data1.getAddress().equals(data2.getAddress())) ||
-             !(Arrays.equals(data1.getMessage(),data2.getMessage())) ||
-             !(Arrays.equals(data1.getUniqueId(),data2.getUniqueId())) ||
-             !(data1.getTimestamp() == data2.timestamp) ||
-             !(data1.getOptions() == data2.getOptions() ) ) {
-            throw new Exception("Not Equal");
-        }
-                         
+    public int hashCode() {
+        return XByteBuffer.toInt(getUniqueId(),0);
+    }
+    
+    public boolean equals(Object o) {
+        if ( o instanceof ClusterData ) {
+            return Arrays.equals(getUniqueId(),((ClusterData)o).getUniqueId());
+        } else return false;
+    }
+    
+    public ClusterData clone() {
+        byte[] d = this.getDataPackage();
+        return ClusterData.getDataFromPackage(d);
     }
     
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Tue Feb 28 08:14:20 2006
@@ -91,6 +91,18 @@
     public XByteBuffer(int size) {
         buf = new byte[size];
     }
+    
+    public XByteBuffer(byte[] data) {
+        this(data,data.length+128);
+    }
+    
+    public XByteBuffer(byte[] data, int size) {
+        int length = Math.max(data.length,size);
+        buf = new byte[length];
+        System.arraycopy(data,0,buf,0,data.length);
+        bufSize = data.length;
+    }
+
 
     /**
      * Constructs a new XByteBuffer with an initial size of 1024 bytes
@@ -98,6 +110,20 @@
     public XByteBuffer()  {
         this(DEF_SIZE);
     }
+    
+    public int getLength() {
+        return bufSize;
+    }
+    
+    public void trim(int length) {
+        if ( (bufSize - length) < 0 ) 
+            throw new ArrayIndexOutOfBoundsException("Can't trim more bytes 
than are available. length:"+bufSize+" trim:"+length);
+        bufSize -= length;
+    }
+            
+    public byte[] getBytesDirect() {
+        return this.buf;
+    }
 
     /**
      * Returns the bytes in the buffer, in its exact length
@@ -415,11 +441,6 @@
         ByteArrayOutputStream outs = new ByteArrayOutputStream();
         ObjectOutputStream out = new ObjectOutputStream(outs);
         out.writeObject(msg);
-        // flush out the gzip stream to byte buffer
-//        if(out != null) {
-//            out.flush();
-//            out.close();
-//        }
         byte[] data = outs.toByteArray();
         return data;
     }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
 Tue Feb 28 08:14:20 2006
@@ -135,7 +135,7 @@
      * @return - the bytes for this member deserialized
      * @throws Exception
      */
-    public byte[] getData() throws IOException {
+    public byte[] getData()  {
         //package looks like
         //alive - 8 bytes
         //port - 4 bytes

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMembership.java
 Tue Feb 28 08:14:20 2006
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import org.apache.catalina.tribes.Member;
 
 /**
  * A <b>membership</b> implementation using simple multicast.
@@ -190,6 +191,19 @@
      */
     public synchronized boolean hasMembers() {
         return members.length > 0 ;
+    }
+    
+    
+    public synchronized McastMember getMember(Member mbr) {
+        if(hasMembers()) {
+            McastMember result = null;
+            for ( int i=0; i<this.members.length && result==null; i++ ) {
+                if ( members[i].equals(mbr) ) result = members[i];
+            }//for
+            return result;
+        } else {
+            return null;
+        }
     }
  
     /**

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastService.java
 Tue Feb 28 08:14:20 2006
@@ -352,6 +352,11 @@
        if ( impl == null || impl.membership == null ) return false;
        return impl.membership.hasMembers();
     }
+    
+    public Member getMember(Member mbr) {
+        if ( impl == null || impl.membership == null ) return null;
+        return impl.membership.getMember(mbr);
+    }
 
     /**
      * Return all the members

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/AsyncSocketSender.java
 Tue Feb 28 08:14:20 2006
@@ -166,12 +166,12 @@
         synchronized (this) {
             inQueueCounter++;
             if(queueThread != null)
-                queueThread.incQueuedNrOfBytes(data.getMessage().length);
+                queueThread.incQueuedNrOfBytes(data.getMessage().getLength());
        }
         if (log.isTraceEnabled())
             log.trace(sm.getString("AsyncSocketSender.queue.message",
                     getAddress().getHostAddress(), new Integer(getPort()), 
data.getUniqueId(), new Long(
-                            data.getMessage().length)));
+                            data.getMessage().getLength())));
     }
 
     /*
@@ -268,7 +268,7 @@
                     int messagesize = 0;
                     try {
                         ChannelMessage data = (ChannelMessage) 
entry.getValue();
-                        messagesize = data.getMessage().length;
+                        messagesize = data.getMessage().getLength();
                         sender.pushMessage(data);
                     } catch (Exception x) {
                         log.warn(sm.getString("AsyncSocketSender.send.error",

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 Tue Feb 28 08:14:20 2006
@@ -827,8 +827,8 @@
             checkKeepAlive();
             if(doProcessingStats) addProcessingStats(time);
             if(messageTransfered) {
-                addStats(data.getMessage().length);
-                if (log.isTraceEnabled()) 
log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new 
Integer(port), data.getUniqueId(), new Long(data.getMessage().length)));
+                addStats(data.getMessage().getLength());
+                if (log.isTraceEnabled()) 
log.trace(sm.getString("IDataSender.send.message", address.getHostAddress(),new 
Integer(port), data.getUniqueId(), new Long(data.getMessage().getLength())));
             } else {
                 dataFailureCounter++;
                 if ( exception != null ) throw exception;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/FastAsyncSocketSender.java
 Tue Feb 28 08:14:20 2006
@@ -319,12 +319,12 @@
         synchronized (this) {
             inQueueCounter++;
             if(queueThread != null)
-                queueThread.incQueuedNrOfBytes(data.getMessage().length);
+                queueThread.incQueuedNrOfBytes(data.getMessage().getLength());
         }
         if (log.isTraceEnabled())
             log.trace(sm.getString("AsyncSocketSender.queue.message",
                     getAddress().getHostAddress(), new Integer(getPort()), 
data.getUniqueId(), new Long(
-                            data.getMessage().length)));
+                            data.getMessage().getLength())));
     }
 
     /**
@@ -480,7 +480,7 @@
                 int messagesize = 0;
                 try {
                     ChannelMessage data = (ChannelMessage) entry.data();
-                    messagesize = data.getMessage().length;
+                    messagesize = data.getMessage().getLength();
                     sender.pushMessage(data);
                 } catch (Exception x) {
                     log.warn(sm.getString(

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/PooledSocketSender.java
 Tue Feb 28 08:14:20 2006
@@ -132,7 +132,7 @@
             //return the connection to the pool
             senderQueue.returnSender(sender);
         }
-        addStats(data.getMessage().length);
+        addStats(data.getMessage().getLength());
     }
 
     public String toString() {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=381687&r1=381686&r2=381687&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 Tue Feb 28 08:14:20 2006
@@ -602,7 +602,7 @@
             }
             sender.sendMessage(data);
             sender.setSuspect(false);
-            addStats(data.getMessage().length);
+            addStats(data.getMessage().getLength());
         } catch (IOException x) {
             if (!sender.getSuspect()) {
                 if (log.isErrorEnabled() ) log.error("Unable to send 
replicated message, is member ["+sender.toString()+"] down?",x);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to