Author: fhanik
Date: Wed Feb 20 09:12:27 2008
New Revision: 629539
URL: http://svn.apache.org/viewvc?rev=629539&view=rev
Log:
Add buffer sizes to the UDP sockets
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
tomcat/trunk/webapps/docs/config/cluster-receiver.xml
tomcat/trunk/webapps/docs/config/cluster-sender.xml
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
Wed Feb 20 09:12:27 2008
@@ -421,12 +421,15 @@
if ( log.isDebugEnabled() )
log.debug("Invalid member mcast package.",ax);
} catch ( Exception x ) {
- if (errorCounter==0) log.warn("Error receiving mcast
package. Sleeping 500ms",x);
- else log.debug("Error receiving mcast package. Sleeping
500ms",x);
- try { Thread.sleep(500); } catch ( Exception ignore ){}
- if ( (++errorCounter)>=recoveryCounter ) {
- errorCounter=0;
- new RecoveryThread(McastServiceImpl.this);
+ if (x instanceof InterruptedException) interrupted();
+ else {
+ if (errorCounter==0) log.warn("Error receiving mcast
package. Sleeping 500ms",x);
+ else log.debug("Error receiving mcast package.
Sleeping 500ms",x);
+ try { Thread.sleep(500); } catch ( Exception ignore ){}
+ if ( (++errorCounter)>=recoveryCounter ) {
+ errorCounter=0;
+ new RecoveryThread(McastServiceImpl.this);
+ }
}
}
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/AbstractSender.java
Wed Feb 20 09:12:27 2008
@@ -38,6 +38,8 @@
private boolean connected = false;
private int rxBufSize = 25188;
private int txBufSize = 43800;
+ private int udpRxBufSize = 25188;
+ private int udpTxBufSize = 43800;
private boolean directBuffer = false;
private int keepAliveCount = -1;
private int requestCount = 0;
@@ -330,6 +332,26 @@
public void setUdpPort(int udpPort) {
this.udpPort = udpPort;
+ }
+
+
+ public int getUdpRxBufSize() {
+ return udpRxBufSize;
+ }
+
+
+ public void setUdpRxBufSize(int udpRxBufSize) {
+ this.udpRxBufSize = udpRxBufSize;
+ }
+
+
+ public int getUdpTxBufSize() {
+ return udpTxBufSize;
+ }
+
+
+ public void setUdpTxBufSize(int udpTxBufSize) {
+ this.udpTxBufSize = udpTxBufSize;
}
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
Wed Feb 20 09:12:27 2008
@@ -56,6 +56,9 @@
private int securePort = -1;
private int rxBufSize = 43800;
private int txBufSize = 25188;
+ private int udpRxBufSize = 43800;
+ private int udpTxBufSize = 25188;
+
private boolean listen = false;
private RxTaskPool pool;
private boolean direct = true;
@@ -518,6 +521,22 @@
public void setUdpPort(int udpPort) {
this.udpPort = udpPort;
+ }
+
+ public int getUdpRxBufSize() {
+ return udpRxBufSize;
+ }
+
+ public void setUdpRxBufSize(int udpRxBufSize) {
+ this.udpRxBufSize = udpRxBufSize;
+ }
+
+ public int getUdpTxBufSize() {
+ return udpTxBufSize;
+ }
+
+ public void setUdpTxBufSize(int udpTxBufSize) {
+ this.udpTxBufSize = udpTxBufSize;
}
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
Wed Feb 20 09:12:27 2008
@@ -250,6 +250,11 @@
setListen(true);
if (selector!=null && datagramChannel!=null) {
ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size
for a datagram packet
+ datagramChannel.socket().setSendBufferSize(getUdpTxBufSize());
+ datagramChannel.socket().setReceiveBufferSize(getUdpRxBufSize());
+ datagramChannel.socket().setReuseAddress(getSoReuseAddress());
+ datagramChannel.socket().setSoTimeout(getTimeout());
+ datagramChannel.socket().setTrafficClass(getSoTrafficClass());
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
Wed Feb 20 09:12:27 2008
@@ -308,8 +308,12 @@
int total = 0;
if (channel instanceof DatagramChannel) {
DatagramChannel dchannel = (DatagramChannel)channel;
- while ( total < command.length ) {
- total += dchannel.send(buf, udpaddr);
+ //were using a shared channel, it's not thread safe
+ //TODO check optimization, one channel per thread
+ synchronized (dchannel) {
+ while ( total < command.length ) {
+ total += dchannel.send(buf, udpaddr);
+ }
}
} else {
while ( total < command.length ) {
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
Wed Feb 20 09:12:27 2008
@@ -149,8 +149,8 @@
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
socketChannel.socket().setTrafficClass(getSoTrafficClass());
} else if (dataChannel!=null) {
- dataChannel.socket().setSendBufferSize(getTxBufSize());
- dataChannel.socket().setReceiveBufferSize(getRxBufSize());
+ dataChannel.socket().setSendBufferSize(getUdpTxBufSize());
+ dataChannel.socket().setReceiveBufferSize(getUdpRxBufSize());
dataChannel.socket().setSoTimeout((int)getTimeout());
dataChannel.socket().setReuseAddress(getSoReuseAddress());
dataChannel.socket().setTrafficClass(getSoTrafficClass());
Modified:
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
---
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
(original)
+++
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
Wed Feb 20 09:12:27 2008
@@ -20,6 +20,8 @@
import java.io.Serializable;
import java.util.Random;
import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelListener;
@@ -33,6 +35,7 @@
import
org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
import
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+import org.apache.catalina.tribes.io.XByteBuffer;
/**
*/
@@ -81,11 +84,18 @@
channel1.send(new Member[] {channel2.getLocalMember(false)},
Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
Thread.sleep(500);
System.err.println("Finished Single package NO_ACK
["+listener1.count+"]");
- assertEquals("Checking success messages.",1,listener1.count);
+ assertEquals("Checking success messages.",1,listener1.count.get());
}
public void testDataSendNO_ACK() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+ rb1.setUdpRxBufSize(1024*1024*10);
+ rb2.setUdpRxBufSize(1024*1024*10);
+ rb1.setUdpTxBufSize(1024*1024*10);
+ rb2.setUdpTxBufSize(1024*1024*10);
System.err.println("Starting NO_ACK");
Thread[] threads = new Thread[threadCount];
for (int x=0; x<threads.length; x++ ) {
@@ -93,7 +103,11 @@
public void run() {
try {
long start = System.currentTimeMillis();
- for (int i = 0; i < msgCount; i++) channel1.send(new
Member[] {channel2.getLocalMember(false)},
Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
+ for (int i = 0; i < msgCount; i++) {
+ int cnt = counter.addAndGet(1);
+ channel1.send(new Member[]
{channel2.getLocalMember(false)},
Data.createRandomData(1024,cnt),Channel.SEND_OPTIONS_UDP);
+ Thread.currentThread().sleep(10);
+ }
System.out.println("Thread["+this.getName()+"] sent
"+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
}catch ( Exception x ) {
x.printStackTrace();
@@ -108,9 +122,19 @@
for (int x=0; x<threads.length; x++ ) { threads[x].join();}
//sleep for 50 sec, let the other messages in
long start = System.currentTimeMillis();
- while ( (System.currentTimeMillis()-start)<25000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+ while ( (System.currentTimeMillis()-start)<25000 &&
msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);
System.err.println("Finished NO_ACK ["+listener1.count+"]");
- assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
+ System.out.println("Sent "+counter.get()+ " messages. Received
"+listener1.count+" Highest msg received:"+listener1.maxIdx);
+ System.out.print("Missing messages:");
+ printMissingMsgs(listener1.nrs,counter.get());
+ assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count.get());
+ }
+
+ public static void printMissingMsgs(int[] msgs, int maxIdx) {
+ for (int i=0; i<maxIdx && i<msgs.length; i++) {
+ if (msgs[i]==0) System.out.print(i+", ");
+ }
+ System.out.println();
}
public void testDataSendASYNCM() throws Exception {
@@ -136,18 +160,18 @@
for (int x=0; x<threads.length; x++ ) { threads[x].join();}
//sleep for 50 sec, let the other messages in
long start = System.currentTimeMillis();
- while ( (System.currentTimeMillis()-start)<25000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+ while ( (System.currentTimeMillis()-start)<25000 &&
msgCount*threadCount!=listener1.count.get()) Thread.sleep(500);
System.err.println("Finished ASYNC MULTI THREAD
["+listener1.count+"]");
- assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
+ assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count.get());
}
public void testDataSendASYNC() throws Exception {
System.err.println("Starting ASYNC");
for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);
//sleep for 50 sec, let the other messages in
long start = System.currentTimeMillis();
- while ( (System.currentTimeMillis()-start)<5000 &&
msgCount!=listener1.count) Thread.sleep(500);
+ while ( (System.currentTimeMillis()-start)<5000 &&
msgCount!=listener1.count.get()) Thread.sleep(500);
System.err.println("Finished ASYNC");
- assertEquals("Checking success messages.",msgCount,listener1.count);
+ assertEquals("Checking success
messages.",msgCount,listener1.count.get());
}
public void testDataSendACK() throws Exception {
@@ -155,7 +179,7 @@
for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
Thread.sleep(250);
System.err.println("Finished ACK");
- assertEquals("Checking success messages.",msgCount,listener1.count);
+ assertEquals("Checking success
messages.",msgCount,listener1.count.get());
}
public void testDataSendSYNCACK() throws Exception {
@@ -163,24 +187,38 @@
for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
Thread.sleep(250);
System.err.println("Finished SYNC_ACK");
- assertEquals("Checking success messages.",msgCount,listener1.count);
+ assertEquals("Checking success
messages.",msgCount,listener1.count.get());
}
public static class Listener implements ChannelListener {
- long count = 0;
+ AtomicLong count = new AtomicLong(0);
+ int maxIdx = -1;
+ int[] nrs = new int[1000000];
+ public Listener() {
+ Arrays.fill(nrs, 0);
+ }
public boolean accept(Serializable s, Member m) {
return (s instanceof Data);
}
public void messageReceived(Serializable s, Member m) {
- Data d = (Data)s;
- if ( !Data.verify(d) ) {
- System.err.println("ERROR");
- } else {
- count++;
- if ((count %1000) ==0 ) {
- System.err.println("SUCCESS:"+count);
+ try {
+ Data d = (Data)s;
+ if ( !Data.verify(d) ) {
+ System.err.println("ERROR - Unable to verify data
package");
+ } else {
+ long c = count.addAndGet(1);
+ if ((c%1000) ==0 ) {
+ System.err.println("SUCCESS:"+c);
+ }
+ int nr = d.getNumber();
+ if (nr>=0 && nr<nrs.length) {
+ maxIdx = Math.max(maxIdx, nr);
+ nrs[nr] = 1;
+ }
}
+ }catch (Exception x ) {
+ x.printStackTrace();
}
}
}
@@ -189,25 +227,41 @@
public int length;
public byte[] data;
public byte key;
+ public boolean hasNr = false;
public static Random r = new Random(System.currentTimeMillis());
public static Data createRandomData() {
return createRandomData(ChannelReceiver.MAX_UDP_SIZE);
}
public static Data createRandomData(int size) {
+ return createRandomData(size,-1);
+ }
+
+ public static Data createRandomData(int size, int number) {
int i = r.nextInt();
i = ( i % 127 );
int length = Math.abs(r.nextInt() % size);
+ if (length<100) length += 100;
Data d = new Data();
d.length = length;
d.key = (byte)i;
d.data = new byte[length];
Arrays.fill(d.data,d.key);
+ if (number>0 && d.data.length>=4) {
+ //populate number
+ d.hasNr = true;
+ XByteBuffer.toBytes(number,d.data, 0);
+ }
return d;
}
+
+ public int getNumber() {
+ if (!hasNr) return -1;
+ return XByteBuffer.toInt(this.data, 0);
+ }
public static boolean verify(Data d) {
boolean result = (d.length == d.data.length);
- for ( int i=0; result && (i<d.data.length); i++ ) result = result
&& d.data[i] == d.key;
+ for ( int i=(d.hasNr?4:0); result && (i<d.data.length); i++ )
result = result && d.data[i] == d.key;
return result;
}
}
Modified: tomcat/trunk/webapps/docs/config/cluster-receiver.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/cluster-receiver.xml?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/config/cluster-receiver.xml (original)
+++ tomcat/trunk/webapps/docs/config/cluster-receiver.xml Wed Feb 20 09:12:27
2008
@@ -117,6 +117,14 @@
<attribute name="txBufSize" required="false">
The sending buffer size on the receiving sockets. Value is in bytes, the
default value is <code>25188</code> bytes.
</attribute>
+ <attribute name="udpRxBufSize" required="false">
+ The receive buffer size on the datagram socket.
+ Default value is <code>25188</code> bytes.
+ </attribute>
+ <attribute name="udpTxBufSize" required="false">
+ The send buffer size on the datagram socket.
+ Default value is <code>43800</code> bytes.
+ </attribute>
<attribute name="soKeepAlive" required="false">
Boolean value for the socket SO_KEEPALIVE option. Possible values are
<code>true</code> or <code>false</code>.
</attribute>
Modified: tomcat/trunk/webapps/docs/config/cluster-sender.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/cluster-sender.xml?rev=629539&r1=629538&r2=629539&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/config/cluster-sender.xml (original)
+++ tomcat/trunk/webapps/docs/config/cluster-sender.xml Wed Feb 20 09:12:27 2008
@@ -90,6 +90,14 @@
The send buffer size on the socket.
Default value is <code>43800</code> bytes.
</attribute>
+ <attribute name="udpRxBufSize" required="false">
+ The receive buffer size on the datagram socket.
+ Default value is <code>25188</code> bytes.
+ </attribute>
+ <attribute name="udpTxBufSize" required="false">
+ The send buffer size on the datagram socket.
+ Default value is <code>43800</code> bytes.
+ </attribute>
<attribute name="direct" required="false">
Possible values are <code>true</code> or <code>false</code>.
Set to true if you want the receiver to use direct bytebuffers when
reading data
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]