Author: fhanik
Date: Tue Feb 19 12:45:54 2008
New Revision: 629223
URL: http://svn.apache.org/viewvc?rev=629223&view=rev
Log:
more UDP impl
Added:
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
Modified: tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java Tue Feb
19 12:45:54 2008
@@ -27,6 +27,8 @@
* @version $Revision$, $Date$
*/
public interface ChannelReceiver extends Heartbeat {
+ public static final int MAX_UDP_SIZE = 65535;
+
/**
* Start listening for incoming messages on the host/port
* @throws java.io.IOException
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
Tue Feb 19 12:45:54 2008
@@ -335,7 +335,10 @@
}
};
} //end if
- if ( t != null ) t.start();
+ if ( t != null ) {
+ t.setDaemon(true);
+ t.start();
+ }
}
} catch (SocketTimeoutException x ) {
//do nothing, this is normal, we don't want to block forever
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
Tue Feb 19 12:45:54 2008
@@ -46,7 +46,6 @@
public static final int OPTION_DIRECT_BUFFER = 0x0004;
-
protected static org.apache.juli.logging.Log log =
org.apache.juli.logging.LogFactory.getLog(ReceiverBase.class);
private MessageListener listener;
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
Tue Feb 19 12:45:54 2008
@@ -249,7 +249,7 @@
setListen(true);
if (selector!=null && datagramChannel!=null) {
- ObjectReader oreader = new ObjectReader(65535); //max size for a
datagram packet
+ ObjectReader oreader = new ObjectReader(MAX_UDP_SIZE); //max size
for a datagram packet
registerChannel(selector,datagramChannel,SelectionKey.OP_READ,oreader);
}
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
Tue Feb 19 12:45:54 2008
@@ -33,6 +33,8 @@
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.BufferPool;
import java.nio.channels.CancelledKeyException;
+
+import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.util.Logs;
@@ -68,10 +70,14 @@
// loop forever waiting for work to do
public synchronized void run() {
if ( buffer == null ) {
+ int size = getRxBufSize();
+ if (key.channel() instanceof DatagramChannel) {
+ size = ChannelReceiver.MAX_UDP_SIZE;
+ }
if ( (getOptions() & OPTION_DIRECT_BUFFER) ==
OPTION_DIRECT_BUFFER) {
- buffer = ByteBuffer.allocateDirect(getRxBufSize());
+ buffer = ByteBuffer.allocateDirect(size);
} else {
- buffer = ByteBuffer.allocate(getRxBufSize());
+ buffer = ByteBuffer.allocate(size);
}
} else {
buffer.clear();
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
Tue Feb 19 12:45:54 2008
@@ -137,16 +137,24 @@
connecting = false;
setRequestCount(0);
setConnectTime(System.currentTimeMillis());
- socketChannel.socket().setSendBufferSize(getTxBufSize());
- socketChannel.socket().setReceiveBufferSize(getRxBufSize());
- socketChannel.socket().setSoTimeout((int)getTimeout());
-
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
- socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
- socketChannel.socket().setKeepAlive(getSoKeepAlive());
- socketChannel.socket().setReuseAddress(getSoReuseAddress());
- socketChannel.socket().setOOBInline(getOoBInline());
- socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
- socketChannel.socket().setTrafficClass(getSoTrafficClass());
+ if (socketChannel!=null) {
+ socketChannel.socket().setSendBufferSize(getTxBufSize());
+ socketChannel.socket().setReceiveBufferSize(getRxBufSize());
+ socketChannel.socket().setSoTimeout((int)getTimeout());
+
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerOn()?getSoLingerTime():0);
+ socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
+ socketChannel.socket().setKeepAlive(getSoKeepAlive());
+ socketChannel.socket().setReuseAddress(getSoReuseAddress());
+ socketChannel.socket().setOOBInline(getOoBInline());
+
socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
+ socketChannel.socket().setTrafficClass(getSoTrafficClass());
+ } else if (dataChannel!=null) {
+ dataChannel.socket().setSendBufferSize(getTxBufSize());
+ dataChannel.socket().setReceiveBufferSize(getRxBufSize());
+ dataChannel.socket().setSoTimeout((int)getTimeout());
+ dataChannel.socket().setReuseAddress(getSoReuseAddress());
+ dataChannel.socket().setTrafficClass(getSoTrafficClass());
+ }
}
@@ -224,6 +232,9 @@
dataChannel = DatagramChannel.open();
dataChannel.configureBlocking(false);
dataChannel.connect(daddr);
+ completeConnect();
+ dataChannel.register(getSelector(),SelectionKey.OP_WRITE, this);
+
} else {
InetSocketAddress addr = new
InetSocketAddress(getAddress(),getPort());
if ( socketChannel != null ) throw new IOException("Socket channel
has already been established. Connection might be in progress.");
Modified:
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?rev=629223&r1=629222&r2=629223&view=diff
==============================================================================
---
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
(original)
+++
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
Tue Feb 19 12:45:54 2008
@@ -28,12 +28,12 @@
import
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
/**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
* <p>Company: </p>
- *
+ *
* @author not attributable
* @version 1.0
*/
@@ -61,7 +61,7 @@
channel1.stop(GroupChannel.DEFAULT);
channel2.stop(GroupChannel.DEFAULT);
}
-
+
public void testDataSendNO_ACK() throws Exception {
System.err.println("Starting NO_ACK");
Thread[] threads = new Thread[threadCount];
@@ -89,7 +89,7 @@
System.err.println("Finished NO_ACK ["+listener1.count+"]");
assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
}
-
+
public void testDataSendASYNCM() throws Exception {
System.err.println("Starting ASYNC MULTI THREAD");
Thread[] threads = new Thread[threadCount];
@@ -113,7 +113,7 @@
for (int x=0; x<threads.length; x++ ) { threads[x].join();}
//sleep for 50 sec, let the other messages in
long start = System.currentTimeMillis();
- while ( (System.currentTimeMillis()-start)<15000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+ while ( (System.currentTimeMillis()-start)<25000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
System.err.println("Finished ASYNC MULTI THREAD
["+listener1.count+"]");
assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
}
@@ -148,7 +148,7 @@
public boolean accept(Serializable s, Member m) {
return (s instanceof Data);
}
-
+
public void messageReceived(Serializable s, Member m) {
Data d = (Data)s;
if ( !Data.verify(d) ) {
@@ -161,7 +161,7 @@
}
}
}
-
+
public static class Data implements Serializable {
public int length;
public byte[] data;
@@ -178,14 +178,14 @@
Arrays.fill(d.data,d.key);
return d;
}
-
+
public static boolean verify(Data d) {
boolean result = (d.length == d.data.length);
for ( int i=0; result && (i<d.data.length); i++ ) result = result
&& d.data[i] == d.key;
return result;
}
}
-
-
+
+
}
Added:
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java?rev=629223&view=auto
==============================================================================
---
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
(added)
+++
tomcat/trunk/test/org/apache/catalina/tribes/test/channel/TestUdpPackages.java
Tue Feb 19 12:45:54 2008
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
+import org.apache.catalina.tribes.transport.AbstractSender;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import
org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import
org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+
+/**
+ */
+public class TestUdpPackages extends TestCase {
+ int msgCount = 500;
+ int threadCount = 20;
+ GroupChannel channel1;
+ GroupChannel channel2;
+ Listener listener1;
+ int threadCounter = 0;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel1 = new GroupChannel();
+ channel1.addInterceptor(new MessageDispatch15Interceptor());
+ channel2 = new GroupChannel();
+ channel2.addInterceptor(new MessageDispatch15Interceptor());
+ ThroughputInterceptor tint = new ThroughputInterceptor();
+ tint.setInterval(500);
+ ThroughputInterceptor tint2 = new ThroughputInterceptor();
+ tint2.setInterval(500);
+ channel1.addInterceptor(tint);
+ channel2.addInterceptor(tint2);
+ listener1 = new Listener();
+ ReceiverBase rb1 = (ReceiverBase)channel1.getChannelReceiver();
+ ReceiverBase rb2 = (ReceiverBase)channel2.getChannelReceiver();
+ rb1.setUdpPort(50000);
+ rb2.setUdpPort(50000);
+ channel2.addChannelListener(listener1);
+ channel1.start(GroupChannel.DEFAULT);
+ channel2.start(GroupChannel.DEFAULT);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ channel1.stop(GroupChannel.DEFAULT);
+ channel2.stop(GroupChannel.DEFAULT);
+ }
+
+ public void testSingleDataSendNO_ACK() throws Exception {
+ AbstractSender s1 =(AbstractSender)
((ReplicationTransmitter)channel1.getChannelSender()).getTransport();
+ AbstractSender s2 =(AbstractSender)
((ReplicationTransmitter)channel2.getChannelSender()).getTransport();
+ s1.setTimeout(Long.MAX_VALUE); //for debugging
+ s2.setTimeout(Long.MAX_VALUE); //for debugging
+
+ System.err.println("Starting Single package NO_ACK");
+ channel1.send(new Member[] {channel2.getLocalMember(false)},
Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
+ Thread.sleep(500);
+ System.err.println("Finished Single package NO_ACK
["+listener1.count+"]");
+ assertEquals("Checking success messages.",1,listener1.count);
+ }
+
+
+ public void testDataSendNO_ACK() throws Exception {
+ System.err.println("Starting NO_ACK");
+ Thread[] threads = new Thread[threadCount];
+ for (int x=0; x<threads.length; x++ ) {
+ threads[x] = new Thread() {
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; i++) channel1.send(new
Member[] {channel2.getLocalMember(false)},
Data.createRandomData(1024),Channel.SEND_OPTIONS_UDP);
+ System.out.println("Thread["+this.getName()+"] sent
"+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ return;
+ } finally {
+ threadCounter++;
+ }
+ }
+ };
+ }
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<25000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+ System.err.println("Finished NO_ACK ["+listener1.count+"]");
+ assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
+ }
+
+ public void testDataSendASYNCM() throws Exception {
+ System.err.println("Starting ASYNC MULTI THREAD");
+ Thread[] threads = new Thread[threadCount];
+ for (int x=0; x<threads.length; x++ ) {
+ threads[x] = new Thread() {
+ public void run() {
+ try {
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; i++)
channel1.send(new Member[] {channel2.getLocalMember(false)},
Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);
+ System.out.println("Thread["+this.getName()+"]
sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+" ms.");
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ return;
+ } finally {
+ threadCounter++;
+ }
+ }
+ };
+ }
+ for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+ for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<25000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+ System.err.println("Finished ASYNC MULTI THREAD
["+listener1.count+"]");
+ assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
+ }
+ public void testDataSendASYNC() throws Exception {
+ System.err.println("Starting ASYNC");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS|Channel.SEND_OPTIONS_UDP);
+ //sleep for 50 sec, let the other messages in
+ long start = System.currentTimeMillis();
+ while ( (System.currentTimeMillis()-start)<5000 &&
msgCount!=listener1.count) Thread.sleep(500);
+ System.err.println("Finished ASYNC");
+ assertEquals("Checking success messages.",msgCount,listener1.count);
+ }
+
+ public void testDataSendACK() throws Exception {
+ System.err.println("Starting ACK");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
+ Thread.sleep(250);
+ System.err.println("Finished ACK");
+ assertEquals("Checking success messages.",msgCount,listener1.count);
+ }
+
+ public void testDataSendSYNCACK() throws Exception {
+ System.err.println("Starting SYNC_ACK");
+ for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(1024),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_UDP);
+ Thread.sleep(250);
+ System.err.println("Finished SYNC_ACK");
+ assertEquals("Checking success messages.",msgCount,listener1.count);
+ }
+
+ public static class Listener implements ChannelListener {
+ long count = 0;
+ public boolean accept(Serializable s, Member m) {
+ return (s instanceof Data);
+ }
+
+ public void messageReceived(Serializable s, Member m) {
+ Data d = (Data)s;
+ if ( !Data.verify(d) ) {
+ System.err.println("ERROR");
+ } else {
+ count++;
+ if ((count %1000) ==0 ) {
+ System.err.println("SUCCESS:"+count);
+ }
+ }
+ }
+ }
+
+ public static class Data implements Serializable {
+ public int length;
+ public byte[] data;
+ public byte key;
+ public static Random r = new Random(System.currentTimeMillis());
+ public static Data createRandomData() {
+ return createRandomData(ChannelReceiver.MAX_UDP_SIZE);
+ }
+ public static Data createRandomData(int size) {
+ int i = r.nextInt();
+ i = ( i % 127 );
+ int length = Math.abs(r.nextInt() % size);
+ Data d = new Data();
+ d.length = length;
+ d.key = (byte)i;
+ d.data = new byte[length];
+ Arrays.fill(d.data,d.key);
+ return d;
+ }
+
+ public static boolean verify(Data d) {
+ boolean result = (d.length == d.data.length);
+ for ( int i=0; result && (i<d.data.length); i++ ) result = result
&& d.data[i] == d.key;
+ return result;
+ }
+ }
+
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]