Author: fhanik Date: Wed Feb 20 09:12:27 2008 New Revision: 629539 URL: http://svn.apache.org/viewvc?rev=629539&view=rev Log: Add buffer sizes to the UDP sockets
Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java tomcat/trunk/webapps/docs/config/cluster-receiver.xml tomcat/trunk/webapps/docs/config/cluster-sender.xml Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Wed Feb 20 09:12:27 2008 @@ -421,12 +421,15 @@ if ( log.isDebugEnabled() ) log.debug("Invalid member mcast package.",ax); } catch ( Exception x ) { - if (errorCounter==0) log.warn("Error receiving mcast package. Sleeping 500ms",x); - else log.debug("Error receiving mcast package. Sleeping 500ms",x); - try { Thread.sleep(500); } catch ( Exception ignore ){} - if ( (++errorCounter)>=recoveryCounter ) { - errorCounter=0; - new RecoveryThread(McastServiceImpl.this); + if (x instanceof InterruptedException) interrupted(); + else { + if (errorCounter==0) log.warn("Error receiving mcast package. Sleeping 500ms",x); + else log.debug("Error receiving mcast package. Sleeping 500ms",x); + try { Thread.sleep(500); } catch ( Exception ignore ){} + if ( (++errorCounter)>=recoveryCounter ) { + errorCounter=0; + new RecoveryThread(McastServiceImpl.this); + } } } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java Wed Feb 20 09:12:27 2008 @@ -38,6 +38,8 @@ private boolean connected = false; private int rxBufSize = 25188; private int txBufSize = 43800; + private int udpRxBufSize = 25188; + private int udpTxBufSize = 43800; private boolean directBuffer = false; private int keepAliveCount = -1; private int requestCount = 0; @@ -330,6 +332,26 @@ public void setUdpPort(int udpPort) { this.udpPort = udpPort; + } + + + public int getUdpRxBufSize() { + return udpRxBufSize; + } + + + public void setUdpRxBufSize(int udpRxBufSize) { + this.udpRxBufSize = udpRxBufSize; + } + + + public int getUdpTxBufSize() { + return udpTxBufSize; + } + + + public void setUdpTxBufSize(int udpTxBufSize) { + this.udpTxBufSize = udpTxBufSize; } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Wed Feb 20 09:12:27 2008 @@ -56,6 +56,9 @@ private int securePort = -1; private int rxBufSize = 43800; private int txBufSize = 25188; + private int udpRxBufSize = 43800; + private int udpTxBufSize = 25188; + private boolean listen = false; private RxTaskPool pool; private boolean direct = true; @@ -518,6 +521,22 @@ public void setUdpPort(int udpPort) { this.udpPort = udpPort; + } + + public int getUdpRxBufSize() { + return udpRxBufSize; + } + + public void setUdpRxBufSize(int udpRxBufSize) { + this.udpRxBufSize = udpRxBufSize; + } + + public int getUdpTxBufSize() { + return udpTxBufSize; + } + + public void setUdpTxBufSize(int udpTxBufSize) { + this.udpTxBufSize = udpTxBufSize; } } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java Wed Feb 20 09:12:27 2008 @@ -250,6 +250,11 @@ setListen(true); if (selector!=null && datagramChannel!=null) { ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size for a datagram packet + datagramChannel.socket().setSendBufferSize(getUdpTxBufSize()); + datagramChannel.socket().setReceiveBufferSize(getUdpRxBufSize()); + datagramChannel.socket().setReuseAddress(getSoReuseAddress()); + datagramChannel.socket().setSoTimeout(getTimeout()); + datagramChannel.socket().setTrafficClass(getSoTrafficClass()); registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader); } Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Wed Feb 20 09:12:27 2008 @@ -308,8 +308,12 @@ int total = 0; if (channel instanceof DatagramChannel) { DatagramChannel dchannel = (DatagramChannel)channel; - while ( total < command.length ) { - total += dchannel.send(buf, udpaddr); + //were using a shared channel, it's not thread safe + //TODO check optimization, one channel per thread + synchronized (dchannel) { + while ( total < command.length ) { + total += dchannel.send(buf, udpaddr); + } } } else { while ( total < command.length ) { Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java Wed Feb 20 09:12:27 2008 @@ -149,8 +149,8 @@ socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); socketChannel.socket().setTrafficClass(getSoTrafficClass()); } else if (dataChannel!=null) { - dataChannel.socket().setSendBufferSize(getTxBufSize()); - dataChannel.socket().setReceiveBufferSize(getRxBufSize()); + dataChannel.socket().setSendBufferSize(getUdpTxBufSize()); + dataChannel.socket().setReceiveBufferSize(getUdpRxBufSize()); dataChannel.socket().setSoTimeout((int)getTimeout()); dataChannel.socket().setReuseAddress(getSoReuseAddress()); dataChannel.socket().setTrafficClass(getSoTrafficClass()); Modified: tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java (original) +++ tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java Wed Feb 20 09:12:27 2008 @@ -20,6 +20,8 @@ import java.io.Serializable; import java.util.Random; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelListener; @@ -33,6 +35,7 @@ import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor; import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor; +import org.apache.catalina.tribes.io.XByteBuffer; /** */ @@ -81,11 +84,18 @@ channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP); Thread.sleep(500); System.err.println("Finished Single package NO_ACK ["+listener1.count+"]"); - assertEquals("Checking success messages.",1,listener1.count); + assertEquals("Checking success messages.",1,listener1.count.get()); } public void testDataSendNO_ACK() throws Exception { + final AtomicInteger counter = new AtomicInteger(0); + ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver(); + ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver(); + rb1.setUdpRxBufSize(1024*1024*10); + rb2.setUdpRxBufSize(1024*1024*10); + rb1.setUdpTxBufSize(1024*1024*10); + rb2.setUdpTxBufSize(1024*1024*10); System.err.println("Starting NO_ACK"); Thread[] threads = new Thread[threadCount]; for (int x=0; x<threads.length; x++ ) { @@ -93,7 +103,11 @@ public void run() { try { long start = System.currentTimeMillis(); - for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP); + for (int i = 0; i < msgCount; i++) { + int cnt = counter.addAndGet(1); + channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP); + Thread.currentThread().sleep(10); + } System.out.println("Thread["+this.getName()+"] sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms."); }catch ( Exception x ) { x.printStackTrace(); @@ -108,9 +122,19 @@ for (int x=0; x<threads.length; x++ ) { threads[x].join();} //sleep for 50 sec, let the other messages in long start = System.currentTimeMillis(); - while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); + while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500); System.err.println("Finished NO_ACK ["+listener1.count+"]"); - assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); + System.out.println("Sent "+counter.get()+ " messages. Received "+listener1.count+" Highest msg received:"+listener1.maxIdx); + System.out.print("Missing messages:"); + printMissingMsgs(listener1.nrs,counter.get()); + assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get()); + } + + public static void printMissingMsgs(int[] msgs, int maxIdx) { + for (int i=0; i<maxIdx && i<msgs.length; i++) { + if (msgs[i]==0) System.out.print(i+", "); + } + System.out.println(); } public void testDataSendASYNCM() throws Exception { @@ -136,18 +160,18 @@ for (int x=0; x<threads.length; x++ ) { threads[x].join();} //sleep for 50 sec, let the other messages in long start = System.currentTimeMillis(); - while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count) Thread.sleep(500); + while ( (System.currentTimeMillis()-start)<25000 && msgCount*threadCount!=listener1.count.get()) Thread.sleep(500); System.err.println("Finished ASYNC MULTI THREAD ["+listener1.count+"]"); - assertEquals("Checking success messages.",msgCount*threadCount,listener1.count); + assertEquals("Checking success messages.",msgCount*threadCount,listener1.count.get()); } public void testDataSendASYNC() throws Exception { System.err.println("Starting ASYNC"); for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP); //sleep for 50 sec, let the other messages in long start = System.currentTimeMillis(); - while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500); + while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count.get()) Thread.sleep(500); System.err.println("Finished ASYNC"); - assertEquals("Checking success messages.",msgCount,listener1.count); + assertEquals("Checking success messages.",msgCount,listener1.count.get()); } public void testDataSendACK() throws Exception { @@ -155,7 +179,7 @@ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP); Thread.sleep(250); System.err.println("Finished ACK"); - assertEquals("Checking success messages.",msgCount,listener1.count); + assertEquals("Checking success messages.",msgCount,listener1.count.get()); } public void testDataSendSYNCACK() throws Exception { @@ -163,24 +187,38 @@ for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP); Thread.sleep(250); System.err.println("Finished SYNC_ACK"); - assertEquals("Checking success messages.",msgCount,listener1.count); + assertEquals("Checking success messages.",msgCount,listener1.count.get()); } public static class Listener implements ChannelListener { - long count = 0; + AtomicLong count = new AtomicLong(0); + int maxIdx = -1; + int[] nrs = new int[1000000]; + public Listener() { + Arrays.fill(nrs, 0); + } public boolean accept(Serializable s, Member m) { return (s instanceof Data); } public void messageReceived(Serializable s, Member m) { - Data d = (Data)s; - if ( !Data.verify(d) ) { - System.err.println("ERROR"); - } else { - count++; - if ((count %1000) ==0 ) { - System.err.println("SUCCESS:"+count); + try { + Data d = (Data)s; + if ( !Data.verify(d) ) { + System.err.println("ERROR - Unable to verify data package"); + } else { + long c = count.addAndGet(1); + if ((c%1000) ==0 ) { + System.err.println("SUCCESS:"+c); + } + int nr = d.getNumber(); + if (nr>=0 && nr<nrs.length) { + maxIdx = Math.max(maxIdx, nr); + nrs[nr] = 1; + } } + }catch (Exception x ) { + x.printStackTrace(); } } } @@ -189,25 +227,41 @@ public int length; public byte[] data; public byte key; + public boolean hasNr = false; public static Random r = new Random(System.currentTimeMillis()); public static Data createRandomData() { return createRandomData(ChannelReceiver.MAX_UDP_SIZE); } public static Data createRandomData(int size) { + return createRandomData(size,-1); + } + + public static Data createRandomData(int size, int number) { int i = r.nextInt(); i = ( i % 127 ); int length = Math.abs(r.nextInt() % size); + if (length<100) length += 100; Data d = new Data(); d.length = length; d.key = (byte)i; d.data = new byte[length]; Arrays.fill(d.data,d.key); + if (number>0 && d.data.length>=4) { + //populate number + d.hasNr = true; + XByteBuffer.toBytes(number,d.data, 0); + } return d; } + + public int getNumber() { + if (!hasNr) return -1; + return XByteBuffer.toInt(this.data, 0); + } public static boolean verify(Data d) { boolean result = (d.length == d.data.length); - for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key; + for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key; return result; } } Modified: tomcat/trunk/webapps/docs/config/cluster-receiver.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/cluster-receiver.xml?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/config/cluster-receiver.xml (original) +++ tomcat/trunk/webapps/docs/config/cluster-receiver.xml Wed Feb 20 09:12:27 2008 @@ -117,6 +117,14 @@ <attribute name="txBufSize" required="false"> The sending buffer size on the receiving sockets. Value is in bytes, the default value is <code>25188</code> bytes. </attribute> + <attribute name="udpRxBufSize" required="false"> + The receive buffer size on the datagram socket. + Default value is <code>25188</code> bytes. + </attribute> + <attribute name="udpTxBufSize" required="false"> + The send buffer size on the datagram socket. + Default value is <code>43800</code> bytes. + </attribute> <attribute name="soKeepAlive" required="false"> Boolean value for the socket SO_KEEPALIVE option. Possible values are <code>true</code> or <code>false</code>. </attribute> Modified: tomcat/trunk/webapps/docs/config/cluster-sender.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/cluster-sender.xml?rev=629539&r1=629538&r2=629539&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/config/cluster-sender.xml (original) +++ tomcat/trunk/webapps/docs/config/cluster-sender.xml Wed Feb 20 09:12:27 2008 @@ -90,6 +90,14 @@ The send buffer size on the socket. Default value is <code>43800</code> bytes. </attribute> + <attribute name="udpRxBufSize" required="false"> + The receive buffer size on the datagram socket. + Default value is <code>25188</code> bytes. + </attribute> + <attribute name="udpTxBufSize" required="false"> + The send buffer size on the datagram socket. + Default value is <code>43800</code> bytes. + </attribute> <attribute name="direct" required="false"> Possible values are <code>true</code> or <code>false</code>. Set to true if you want the receiver to use direct bytebuffers when reading data --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]