Author: fhanik
Date: Mon Feb 27 10:02:08 2006
New Revision: 381399

URL: http://svn.apache.org/viewcvs?rev=381399&view=rev
Log:
Added in a very usable functionality/load test

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381399&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
 Mon Feb 27 10:02:08 2006
@@ -0,0 +1,374 @@
+package org.apache.catalina.tribes.demos;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.ChannelListener;
+import java.io.Serializable;
+import org.apache.catalina.tribes.ManagedChannel;
+import java.io.Externalizable;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.util.Random;
+import java.io.ObjectInput;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.tcp.ReplicationListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.tcp.ReplicationTransmitter;
+import org.apache.catalina.tribes.mcast.McastService;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.group.interceptors.GzipInterceptor;
+import org.apache.catalina.tribes.ChannelException;
+
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class LoadTest implements MembershipListener,ChannelListener, Runnable {
+    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(LoadTest.class);
+    public static Object mutex = new Object();
+    public boolean doRun = true;
+    
+    public long bytesReceived = 0;
+    public int  messagesReceived = 0;
+    public boolean send = true;
+    public boolean debug = false;
+    public int msgCount = 100;
+    ManagedChannel channel=null;
+    public int statsInterval = 10000;
+    public long pause = 0;
+    public boolean breakonChannelException = false;
+    public long receiveStart = 0;
+    
+    static int messageSize = 0;
+    
+    
+    
+    public LoadTest(ManagedChannel channel, 
+                    boolean send,
+                    int msgCount,
+                    boolean debug,
+                    long pause,
+                    int stats,
+                    boolean breakOnEx) {
+        this.channel = channel;
+        this.send = send;
+        this.msgCount = msgCount;
+        this.debug = debug;
+        this.pause = pause;
+        this.statsInterval = stats;
+        this.breakonChannelException = breakOnEx;
+    }
+    
+    
+    
+    public void run() {
+        
+        int counter = 0;
+        LoadMessage msg = new LoadMessage();
+        int messageSize = LoadTest.messageSize;
+        long sendTime = 0;
+        try {
+            while (counter < msgCount) {
+                if (channel.getMembers().length == 0 || (!send)) {
+                    synchronized (mutex) {
+                        try {
+                            mutex.wait();
+                        } catch (InterruptedException x) {
+                            log.info("Thread interrupted from wait");
+                        }
+                    }
+                } else {
+                    try {
+                        msg.setMsgNr(++counter);
+                        long start = System.currentTimeMillis();
+                        if (debug) {
+                            printArray(msg.getMessage());
+                        }
+                        channel.send(null, msg);
+                        if ( pause > 0 ) {
+                            if ( debug) System.out.println("Pausing sender for 
"+pause+" ms.");
+                            Thread.sleep(pause);
+                        }
+                        sendTime += (System.currentTimeMillis() - start);
+                    } catch (ChannelException x) {
+                        log.error("Unable to send message.");
+                        Member[] faulty = x.getFaultyMembers();
+                        for (int i=0; i<faulty.length; i++ ) 
log.error("Faulty: "+faulty[i]);
+                        --counter;
+                        if ( this.breakonChannelException ) throw x;
+                    }
+                }
+                if ( (counter % statsInterval) == 0 && (counter > 0)) {
+                    printSendStats(counter, messageSize, sendTime);
+                }
+
+            }
+        }catch ( Exception x ) {
+            x.printStackTrace();
+            printSendStats(counter, messageSize, sendTime);
+        }
+    }
+
+    private void printSendStats(int counter, int messageSize, long sendTime) {
+        float cnt = (float)counter;
+        float size = (float)messageSize;
+        float time = (float)sendTime / 1000;
+        log.info("****SEND STATS*****"+
+                 "\n\tMessage count:"+counter+
+                 "\n\tTotal bytes  :"+(long)(size*cnt)+
+                 "\n\tTotal seconds:"+(time)+
+                 "\n\tBytes/second :"+(size*cnt/time)+
+                 "\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
+    }
+
+    /**
+     * memberAdded
+     *
+     * @param member Member
+     * @todo Implement this org.apache.catalina.tribes.MembershipListener
+     *   method
+     */
+    public void memberAdded(Member member) {
+        log.info("Member added:"+member);
+        synchronized (mutex) {
+            mutex.notifyAll();
+        }
+    }
+
+    /**
+     * memberDisappeared
+     *
+     * @param member Member
+     * @todo Implement this org.apache.catalina.tribes.MembershipListener
+     *   method
+     */
+    public void memberDisappeared(Member member) {
+        log.info("Member disappeared:"+member);
+    }
+    
+    public boolean accept(Serializable msg, Member mbr){ 
+       return (msg instanceof LoadMessage);
+    }
+    
+    public void messageReceived(Serializable msg, Member mbr){ 
+        if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
+        if ( debug ) {
+            if ( msg instanceof LoadMessage ) {
+                printArray(((LoadMessage)msg).getMessage());
+            }
+        }
+        
+        if ( msg instanceof ByteMessage ) {
+            LoadMessage tmp = new LoadMessage();
+            tmp.setMessage(((ByteMessage)msg).getMessage());
+            msg = tmp;
+            tmp = null;
+        }
+        
+        bytesReceived+=((LoadMessage)msg).getMessage().length;
+        messagesReceived++;
+        if ( (messagesReceived%statsInterval)==0 || 
(messagesReceived==msgCount)) {
+            float bytes = 
(float)(((LoadMessage)msg).getMessage().length*messagesReceived);
+            float seconds = ((float)(System.currentTimeMillis()-receiveStart)) 
/ 1000f;
+            log.info("****RECEIVE STATS*****"+
+                     "\n\tMessage count :"+(long)messagesReceived+
+                     "\n\tTotal bytes   :"+bytes+
+                     "\n\tTime since 1st:"+seconds+" seconds"+
+                     "\n\tBytes/second  :"+(bytes/seconds)+
+                     "\n\tMBytes/second :"+(bytes/seconds/1024f/1024f));
+
+        }
+    }
+    
+    
+    public static void printArray(byte[] data) {
+        System.out.print("{");
+        for (int i=0; i<data.length; i++ ) {
+            System.out.print(data[i]);
+            System.out.print(",");
+        }
+        System.out.println("} size:"+data.length);
+    }
+
+    
+    
+    public static class LoadMessage extends ByteMessage implements 
Serializable  {
+        public static int size = 1020;
+        public static byte[] outdata = new byte[size];
+        public static Random r = new Random(System.currentTimeMillis());
+        public static int getMessageSize (LoadMessage msg) {
+            int messageSize = msg.getMessage().length;
+            if ( ((Object)msg) instanceof ByteMessage ) return messageSize;
+            try {
+                messageSize  = XByteBuffer.serialize(new LoadMessage()).length;
+                log.info("Average message size:" + messageSize + " bytes");
+            } catch (Exception x) {
+                log.error("Unable to calculate test message size.", x);
+            }
+            return messageSize;
+        }
+        
+        protected byte[] message = null;
+        private int msgNr;
+
+        static {
+            r.nextBytes(outdata);
+            
+        }
+        
+        public int getMsgNr() {
+            return XByteBuffer.toInt(getMessage(),0);
+        }
+        
+        public void setMsgNr(int nr) {
+            byte[] data = XByteBuffer.toBytes(nr);
+            System.arraycopy(data,0,getMessage(),0,4);
+            setMessage(getMessage());
+        }
+        
+        public byte[] getMessage() {
+            byte[] data = new byte[size+4];
+            System.arraycopy(XByteBuffer.toBytes(msgNr),0,data,0,4);
+            if ( message != null ) {
+                System.arraycopy(message, 0, data, 4, message.length);
+            }else {
+                System.arraycopy(outdata, 0, data, 4, outdata.length);
+            }
+            return data;
+        }
+        
+        public void setMessage(byte[] data) {
+            this.msgNr = XByteBuffer.toInt(data,0);
+            this.message = new byte[data.length-4];
+            System.arraycopy(data,4,message,0,message.length);
+        }
+    }
+    
+    public static void usage() {
+        System.out.println("Tribes Load tester.");
+        System.out.println("The load tester can be used in sender or received 
mode or both");
+        System.out.println("Usage:\n\t"+
+                           "java LoadTest [options]\n\t"+
+                           "Options:\n\t\t"+
+                           "[-bind tcpbindaddress] \n\t\t"+
+                           "[-port tcplistenport]  \n\t\t"+
+                           "[-mbind multicastbindaddr]  \n\t\t"+
+                           "[-mode receive|send|both]  \n\t\t"+
+                           "[-debug]  \n\t\t"+
+                           "[-count messagecount]  \n\t\t"+
+                           "[-stats statinterval]  \n\t\t"+
+                           "[-ack true|false]  \n\t\t"+
+                           "[-sync true|false]  \n\t\t"+
+                           "[-gzip]  \n\t\t"+
+                           "[-pause nrofsecondstopausebetweensends]  \n\t\t"+
+                           "[-sender pooled|fastasyncqueue]  \n\t\t"+
+                           "[-break (halts execution on exception)]\n"+
+                           "Example:\n\t"+
+                           "java LoadTest -port 4004\n\t"+
+                           "java LoadTest -bind 192.168.0.45 -port 4005\n\t"+
+                           "java LoadTest -bind 192.168.0.45 -port 4005 -mbind 
192.168.0.45 -count 100 -stats 10\n");
+    }
+    
+    public static void main(String[] args) throws Exception {
+        String bind  = "auto";
+        int port = 4001;
+        boolean send = true;
+        String mbind = null;
+        boolean debug = false;
+        boolean ack = true;
+        boolean sync = true;
+        boolean gzip = false;
+        long pause = 0;
+        int count = 1000000;
+        int stats = 10000;
+        boolean breakOnEx = false;
+        String sender = "pooled";
+        if ( args.length == 0 ) {
+            args = new String[] {"-help"};
+        }
+        for (int i = 0; i < args.length; i++) {
+            if ("-bind".equals(args[i])) {
+                bind = args[++i];
+            } else if ("-sender".equals(args[i])) {
+                sender = args[++i];
+            } else if ("-port".equals(args[i])) {
+                port = Integer.parseInt(args[++i]);
+            } else if ("-count".equals(args[i])) {
+                count = Integer.parseInt(args[++i]);
+            } else if ("-pause".equals(args[i])) {
+                pause = Long.parseLong(args[++i])*1000;
+            } else if ("-gzip".equals(args[i])) {
+                gzip = true;
+            } else if ("-break".equals(args[i])) {
+                breakOnEx = true;
+            } else if ("-ack".equals(args[i])) {
+                ack = Boolean.parseBoolean(args[++i]);
+            } else if ("-sync".equals(args[i])) {
+                sync = Boolean.parseBoolean(args[++i]);
+            } else if ("-stats".equals(args[i])) {
+                stats = Integer.parseInt(args[++i]);
+                System.out.println("Stats every "+stats+" message");
+            } else if ("-mbind".equals(args[i])) {
+                mbind = args[++i];
+            } else if ("-mode".equals(args[i])) {
+                if ( "receive".equals(args[++i]) ) send = false;
+            } else if ("-debug".equals(args[i])) {
+                debug = true;
+            } else //("-help".equals(args[i])) 
+            {
+                usage();
+                System.exit(1);
+            }
+        }
+        
+        
+        ReplicationListener rl = new ReplicationListener();
+        rl.setTcpListenAddress(bind);
+        rl.setTcpListenPort(port);
+        rl.setTcpSelectorTimeout(100);
+        rl.setTcpThreadCount(4);
+        rl.getBind();
+        rl.setSendAck(ack);
+        rl.setSynchronized(sync);
+
+        ReplicationTransmitter ps = new ReplicationTransmitter();
+        ps.setReplicationMode(sender);
+        ps.setAckTimeout(15000);
+        ps.setAutoConnect(true);
+        ps.setWaitForAck(ack);
+
+        McastService service = new McastService();
+        service.setMcastAddr("228.0.0.5");
+        if ( mbind != null ) service.setMcastBindAddress(mbind);
+        service.setMcastFrequency(500);
+        service.setMcastDropTime(2000);
+        service.setMcastPort(45565);
+
+        ManagedChannel channel = new GroupChannel();
+        channel.setChannelReceiver(rl);
+        channel.setChannelSender(ps);
+        channel.setMembershipService(service);
+        
+        if ( gzip ) channel.addInterceptor(new GzipInterceptor());
+        
+        LoadTest test = new 
LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+        LoadMessage msg = new LoadMessage();
+        messageSize = LoadMessage.getMessageSize(msg);
+        channel.setChannelListener(test);
+        channel.setMembershipListener(test);
+        channel.start(channel.DEFAULT);
+        test.run();
+        System.out.println("System test complete, sleeping to let threads 
finish.");
+        Thread.sleep(60*1000*60);
+    }    
+    
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to