Author: fhanik
Date: Wed Feb 20 09:12:27 2008
New Revision: 629539

URL: http://svn.apache.org/viewvc?rev=629539&view=rev
Log:
Add buffer sizes to the UDP sockets

Modified:
    
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
    
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
    tomcat/trunk/webapps/docs/config/cluster-receiver.xml
    tomcat/trunk/webapps/docs/config/cluster-sender.xml

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
(original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java 
Wed Feb 20 09:12:27 2008
@@ -421,12 +421,15 @@
                     if ( log.isDebugEnabled() )
                         log.debug("Invalid member mcast package.",ax);
                 } catch ( Exception x ) {
-                    if (errorCounter==0) log.warn("Error receiving mcast 
package. Sleeping 500ms",x);
-                    else log.debug("Error receiving mcast package. Sleeping 
500ms",x);
-                    try { Thread.sleep(500); } catch ( Exception ignore ){}
-                    if ( (++errorCounter)>=recoveryCounter ) {
-                        errorCounter=0;
-                        new RecoveryThread(McastServiceImpl.this);
+                    if (x instanceof InterruptedException) interrupted();
+                    else {
+                        if (errorCounter==0) log.warn("Error receiving mcast 
package. Sleeping 500ms",x);
+                        else log.debug("Error receiving mcast package. 
Sleeping 500ms",x);
+                        try { Thread.sleep(500); } catch ( Exception ignore ){}
+                        if ( (++errorCounter)>=recoveryCounter ) {
+                            errorCounter=0;
+                            new RecoveryThread(McastServiceImpl.this);
+                        }
                     }
                 }
             }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java 
Wed Feb 20 09:12:27 2008
@@ -38,6 +38,8 @@
     private boolean connected = false;
     private int rxBufSize = 25188;
     private int txBufSize = 43800;
+    private int udpRxBufSize = 25188;
+    private int udpTxBufSize = 43800;
     private boolean directBuffer = false;
     private int keepAliveCount = -1;
     private int requestCount = 0;
@@ -330,6 +332,26 @@
 
     public void setUdpPort(int udpPort) {
         this.udpPort = udpPort;
+    }
+
+
+    public int getUdpRxBufSize() {
+        return udpRxBufSize;
+    }
+
+
+    public void setUdpRxBufSize(int udpRxBufSize) {
+        this.udpRxBufSize = udpRxBufSize;
+    }
+
+
+    public int getUdpTxBufSize() {
+        return udpTxBufSize;
+    }
+
+
+    public void setUdpTxBufSize(int udpTxBufSize) {
+        this.udpTxBufSize = udpTxBufSize;
     }
 
 }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java 
Wed Feb 20 09:12:27 2008
@@ -56,6 +56,9 @@
     private int securePort = -1;
     private int rxBufSize = 43800;
     private int txBufSize = 25188;
+    private int udpRxBufSize = 43800;
+    private int udpTxBufSize = 25188;
+
     private boolean listen = false;
     private RxTaskPool pool;
     private boolean direct = true;
@@ -518,6 +521,22 @@
 
     public void setUdpPort(int udpPort) {
         this.udpPort = udpPort;
+    }
+
+    public int getUdpRxBufSize() {
+        return udpRxBufSize;
+    }
+
+    public void setUdpRxBufSize(int udpRxBufSize) {
+        this.udpRxBufSize = udpRxBufSize;
+    }
+
+    public int getUdpTxBufSize() {
+        return udpTxBufSize;
+    }
+
+    public void setUdpTxBufSize(int udpTxBufSize) {
+        this.udpTxBufSize = udpTxBufSize;
     }
 
 }

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java 
Wed Feb 20 09:12:27 2008
@@ -250,6 +250,11 @@
         setListen(true);
         if (selector!=null && datagramChannel!=null) {
             ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size 
for a datagram packet
+            datagramChannel.socket().setSendBufferSize(getUdpTxBufSize());
+            datagramChannel.socket().setReceiveBufferSize(getUdpRxBufSize());
+            datagramChannel.socket().setReuseAddress(getSoReuseAddress());
+            datagramChannel.socket().setSoTimeout(getTimeout());
+            datagramChannel.socket().setTrafficClass(getSoTrafficClass());
             
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
         }
 

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
 (original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
 Wed Feb 20 09:12:27 2008
@@ -308,8 +308,12 @@
             int total = 0;
             if (channel instanceof DatagramChannel) {
                 DatagramChannel dchannel = (DatagramChannel)channel;
-                while ( total < command.length ) {
-                    total += dchannel.send(buf, udpaddr);
+                //were using a shared channel, it's not thread safe
+                //TODO check optimization, one channel per thread
+                synchronized (dchannel) {
+                    while ( total < command.length ) {
+                        total += dchannel.send(buf, udpaddr);
+                    }
                 }
             } else {
                 while ( total < command.length ) {

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java 
Wed Feb 20 09:12:27 2008
@@ -149,8 +149,8 @@
             
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
             socketChannel.socket().setTrafficClass(getSoTrafficClass());
         } else if (dataChannel!=null) {
-            dataChannel.socket().setSendBufferSize(getTxBufSize());
-            dataChannel.socket().setReceiveBufferSize(getRxBufSize());
+            dataChannel.socket().setSendBufferSize(getUdpTxBufSize());
+            dataChannel.socket().setReceiveBufferSize(getUdpRxBufSize());
             dataChannel.socket().setSoTimeout((int)getTimeout());
             dataChannel.socket().setReuseAddress(getSoReuseAddress());
             dataChannel.socket().setTrafficClass(getSoTrafficClass());

Modified: 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java 
(original)
+++ 
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java 
Wed Feb 20 09:12:27 2008
@@ -20,6 +20,8 @@
 import java.io.Serializable;
 import java.util.Random;
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelListener;
@@ -33,6 +35,7 @@
 import 
org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
 import 
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
 import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+import org.apache.catalina.tribes.io.XByteBuffer;
 
 /**
  */
@@ -81,11 +84,18 @@
         channel1.send(new Member[] {channel2.getLocalMember(false)}, 
Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
         Thread.sleep(500);
         System.err.println("Finished Single package NO_ACK 
["+listener1.count+"]");
-        assertEquals("Checking success messages.",1,listener1.count);
+        assertEquals("Checking success messages.",1,listener1.count.get());
     }
 
     
     public void testDataSendNO_ACK() throws Exception {
+        final AtomicInteger counter = new AtomicInteger(0);
+        ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+        ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+        rb1.setUdpRxBufSize(1024*1024*10);
+        rb2.setUdpRxBufSize(1024*1024*10);
+        rb1.setUdpTxBufSize(1024*1024*10);
+        rb2.setUdpTxBufSize(1024*1024*10);
         System.err.println("Starting NO_ACK");
         Thread[] threads = new Thread[threadCount];
         for (int x=0; x<threads.length; x++ ) {
@@ -93,7 +103,11 @@
                 public void run() {
                     try {
                         long start = System.currentTimeMillis();
-                        for (int i = 0; i < msgCount; i++) channel1.send(new 
Member[] {channel2.getLocalMember(false)}, 
Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
+                        for (int i = 0; i < msgCount; i++) {
+                            int cnt = counter.addAndGet(1);
+                            channel1.send(new Member[] 
{channel2.getLocalMember(false)}, 
Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP);
+                            Thread.currentThread().sleep(10);
+                        }
                         System.out.println("Thread["+this.getName()+"] sent 
"+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
                     }catch ( Exception x ) {
                         x.printStackTrace();
@@ -108,9 +122,19 @@
         for (int x=0; x<threads.length; x++ ) { threads[x].join();}
         //sleep for 50 sec, let the other messages in
         long start = System.currentTimeMillis();
-        while ( (System.currentTimeMillis()-start)<25000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+        while ( (System.currentTimeMillis()-start)<25000 && 
msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);
         System.err.println("Finished NO_ACK ["+listener1.count+"]");
-        assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count);
+        System.out.println("Sent "+counter.get()+ " messages. Received 
"+listener1.count+" Highest msg received:"+listener1.maxIdx);
+        System.out.print("Missing messages:");
+        printMissingMsgs(listener1.nrs,counter.get());
+        assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count.get());
+    }
+    
+    public static void printMissingMsgs(int[] msgs, int maxIdx) {
+        for (int i=0; i<maxIdx && i<msgs.length; i++) {
+            if (msgs[i]==0) System.out.print(i+", ");
+        }
+        System.out.println();
     }
 
     public void testDataSendASYNCM() throws Exception {
@@ -136,18 +160,18 @@
             for (int x=0; x<threads.length; x++ ) { threads[x].join();}
             //sleep for 50 sec, let the other messages in
             long start = System.currentTimeMillis();
-            while ( (System.currentTimeMillis()-start)<25000 && 
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+            while ( (System.currentTimeMillis()-start)<25000 && 
msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);
             System.err.println("Finished ASYNC MULTI THREAD 
["+listener1.count+"]");
-            assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count);
+            assertEquals("Checking success 
messages.",msgCount*threadCount,listener1.count.get());
     }
     public void testDataSendASYNC() throws Exception {
         System.err.println("Starting ASYNC");
         for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);
         //sleep for 50 sec, let the other messages in
         long start = System.currentTimeMillis();
-        while ( (System.currentTimeMillis()-start)<5000 && 
msgCount!=listener1.count) Thread.sleep(500);
+        while ( (System.currentTimeMillis()-start)<5000 && 
msgCount!=listener1.count.get()) Thread.sleep(500);
         System.err.println("Finished ASYNC");
-        assertEquals("Checking success messages.",msgCount,listener1.count);
+        assertEquals("Checking success 
messages.",msgCount,listener1.count.get());
     }
 
     public void testDataSendACK() throws Exception {
@@ -155,7 +179,7 @@
         for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
         Thread.sleep(250);
         System.err.println("Finished ACK");
-        assertEquals("Checking success messages.",msgCount,listener1.count);
+        assertEquals("Checking success 
messages.",msgCount,listener1.count.get());
     }
 
     public void testDataSendSYNCACK() throws Exception {
@@ -163,24 +187,38 @@
         for (int i=0; i<msgCount; i++) channel1.send(new Member[] 
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
         Thread.sleep(250);
         System.err.println("Finished SYNC_ACK");
-        assertEquals("Checking success messages.",msgCount,listener1.count);
+        assertEquals("Checking success 
messages.",msgCount,listener1.count.get());
     }
 
     public static class Listener implements ChannelListener {
-        long count = 0;
+        AtomicLong count = new AtomicLong(0);
+        int maxIdx = -1;
+        int[] nrs = new int[1000000];
+        public Listener() {
+            Arrays.fill(nrs, 0);
+        }
         public boolean accept(Serializable s, Member m) {
             return (s instanceof Data);
         }
 
         public void messageReceived(Serializable s, Member m) {
-            Data d = (Data)s;
-            if ( !Data.verify(d) ) {
-                System.err.println("ERROR");
-            } else {
-                count++;
-                if ((count %1000) ==0 ) {
-                    System.err.println("SUCCESS:"+count);
+            try {
+                Data d = (Data)s;
+                if ( !Data.verify(d) ) {
+                    System.err.println("ERROR - Unable to verify data 
package");
+                } else {
+                    long c = count.addAndGet(1);
+                    if ((c%1000) ==0 ) {
+                        System.err.println("SUCCESS:"+c);
+                    }
+                    int nr = d.getNumber();
+                    if (nr>=0 && nr<nrs.length) {
+                        maxIdx = Math.max(maxIdx, nr);
+                        nrs[nr] = 1;
+                    }
                 }
+            }catch (Exception x ) {
+                x.printStackTrace();
             }
         }
     }
@@ -189,25 +227,41 @@
         public int length;
         public byte[] data;
         public byte key;
+        public boolean hasNr = false;
         public static Random r = new Random(System.currentTimeMillis());
         public static Data createRandomData() {
             return createRandomData(ChannelReceiver.MAX_UDP_SIZE);
         }
         public static Data createRandomData(int size) {
+            return createRandomData(size,-1);
+        }
+        
+        public static Data createRandomData(int size, int number) {
             int i = r.nextInt();
             i = ( i % 127 );
             int length = Math.abs(r.nextInt() % size);
+            if (length<100) length += 100;
             Data d = new Data();
             d.length = length;
             d.key = (byte)i;
             d.data = new byte[length];
             Arrays.fill(d.data,d.key);
+            if (number>0 && d.data.length>=4) {
+                //populate number
+                d.hasNr = true;
+                XByteBuffer.toBytes(number,d.data, 0);
+            }
             return d;
         }
+        
+        public int getNumber() {
+            if (!hasNr) return -1;
+            return XByteBuffer.toInt(this.data, 0);
+        }
 
         public static boolean verify(Data d) {
             boolean result = (d.length == d.data.length);
-            for ( int i=0; result && (i<d.data.length); i++ ) result = result 
&& d.data[i] == d.key;
+            for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ ) 
result = result && d.data[i] == d.key;
             return result;
         }
     }

Modified: tomcat/trunk/webapps/docs/config/cluster-receiver.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/cluster-receiver.xml?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/config/cluster-receiver.xml (original)
+++ tomcat/trunk/webapps/docs/config/cluster-receiver.xml Wed Feb 20 09:12:27 
2008
@@ -117,6 +117,14 @@
     <attribute name="txBufSize" required="false">
       The sending buffer size on the receiving sockets. Value is in bytes, the 
default value is <code>25188</code> bytes.
     </attribute>
+    <attribute name="udpRxBufSize" required="false">
+        The receive buffer size on the datagram socket.
+        Default value is <code>25188</code> bytes.
+    </attribute>
+    <attribute name="udpTxBufSize" required="false">
+       The send buffer size on the datagram socket.
+       Default value is <code>43800</code> bytes.
+    </attribute>
     <attribute name="soKeepAlive" required="false">
       Boolean value for the socket SO_KEEPALIVE option. Possible values are 
<code>true</code> or <code>false</code>.
     </attribute>

Modified: tomcat/trunk/webapps/docs/config/cluster-sender.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/cluster-sender.xml?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/config/cluster-sender.xml (original)
+++ tomcat/trunk/webapps/docs/config/cluster-sender.xml Wed Feb 20 09:12:27 2008
@@ -90,6 +90,14 @@
        The send buffer size on the socket.
        Default value is <code>43800</code> bytes.
       </attribute>
+      <attribute name="udpRxBufSize" required="false">
+        The receive buffer size on the datagram socket.
+        Default value is <code>25188</code> bytes.
+      </attribute>
+      <attribute name="udpTxBufSize" required="false">
+       The send buffer size on the datagram socket.
+       Default value is <code>43800</code> bytes.
+      </attribute>
       <attribute name="direct" required="false">
        Possible values are <code>true</code> or <code>false</code>. 
        Set to true if you want the receiver to use direct bytebuffers when 
reading data



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to