Author: fhanik
Date: Mon Feb 27 13:07:39 2006
New Revision: 381446
URL: http://svn.apache.org/viewcvs?rev=381446&view=rev
Log:
Optimized all serialization of all messaging. ClusterData is now transferred as
a byte array directly.
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
Mon Feb 27 13:07:39 2006
@@ -50,6 +50,13 @@
static int messageSize = 0;
+ public static int messagesSent = 0;
+ public static long messageSendTime = 0;
+
+ public static synchronized void addSendStats(int count, long time) {
+ messagesSent+=count;
+ messageSendTime+=time;
+ }
public LoadTest(ManagedChannel channel,
@@ -108,7 +115,10 @@
}
}
if ( (counter % statsInterval) == 0 && (counter > 0)) {
- printSendStats(counter, messageSize, sendTime);
+ //add to the global counter
+ addSendStats(counter,sendTime);
+ //print from the global counter
+ printSendStats(LoadTest.messagesSent,
LoadTest.messageSize, LoadTest.messageSendTime);
}
}
@@ -122,7 +132,7 @@
float cnt = (float)counter;
float size = (float)messageSize;
float time = (float)sendTime / 1000;
- log.info("****SEND STATS*****"+
+ log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
"\n\tMessage count:"+counter+
"\n\tTotal bytes :"+(long)(size*cnt)+
"\n\tTotal seconds:"+(time)+
@@ -179,9 +189,9 @@
if ( (messagesReceived%statsInterval)==0 ||
(messagesReceived==msgCount)) {
float bytes =
(float)(((LoadMessage)msg).getMessage().length*messagesReceived);
float seconds = ((float)(System.currentTimeMillis()-receiveStart))
/ 1000f;
- log.info("****RECEIVE STATS*****"+
+ log.info("****RECEIVE
STATS-"+Thread.currentThread().getName()+"*****"+
"\n\tMessage count :"+(long)messagesReceived+
- "\n\tTotal bytes :"+bytes+
+ "\n\tTotal bytes :"+(long)bytes+
"\n\tTime since 1st:"+seconds+" seconds"+
"\n\tBytes/second :"+(bytes/seconds)+
"\n\tMBytes/second :"+(bytes/seconds/1024f/1024f));
@@ -237,7 +247,7 @@
public byte[] getMessage() {
byte[] data = new byte[size+4];
- System.arraycopy(XByteBuffer.toBytes(msgNr),0,data,0,4);
+ XByteBuffer.toBytes(msgNr,data,0);
if ( message != null ) {
System.arraycopy(message, 0, data, 4, message.length);
}else {
@@ -271,6 +281,7 @@
"[-gzip] \n\t\t"+
"[-pause nrofsecondstopausebetweensends] \n\t\t"+
"[-sender pooled|fastasyncqueue] \n\t\t"+
+ "[-threads numberofsenderthreads] \n\t\t"+
"[-break (halts execution on exception)]\n"+
"Example:\n\t"+
"java LoadTest -port 4004\n\t"+
@@ -291,6 +302,7 @@
int count = 1000000;
int stats = 10000;
boolean breakOnEx = false;
+ int threads = 1;
String sender = "pooled";
if ( args.length == 0 ) {
args = new String[] {"-help"};
@@ -302,6 +314,8 @@
sender = args[++i];
} else if ("-port".equals(args[i])) {
port = Integer.parseInt(args[++i]);
+ } else if ("-threads".equals(args[i])) {
+ threads = Integer.parseInt(args[++i]);
} else if ("-count".equals(args[i])) {
count = Integer.parseInt(args[++i]);
} else if ("-pause".equals(args[i])) {
@@ -366,7 +380,15 @@
channel.setChannelListener(test);
channel.setMembershipListener(test);
channel.start(channel.DEFAULT);
+ while ( threads > 1 ) {
+ Thread t = new Thread(test);
+ t.setDaemon(true);
+ t.start();
+ threads--;
+ test = new
LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+ }
test.run();
+
System.out.println("System test complete, sleeping to let threads
finish.");
Thread.sleep(60*1000*60);
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
Mon Feb 27 13:07:39 2006
@@ -16,22 +16,22 @@
package org.apache.catalina.tribes.group;
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.ChannelSender;
+import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.MembershipService;
import org.apache.catalina.tribes.io.ClusterData;
import org.apache.catalina.tribes.io.XByteBuffer;
-import java.io.Serializable;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.ManagedChannel;
-import java.util.Iterator;
-import java.util.UUID;
-import org.apache.catalina.tribes.ByteMessage;
/**
* The GroupChannel manages the replication channel. It coordinates
@@ -81,15 +81,6 @@
}
- public byte[] getUUID() {
- UUID id = UUID.randomUUID();
- long msb = id.getMostSignificantBits();
- long lsb = id.getLeastSignificantBits();
- byte[] data = new byte[16];
- System.arraycopy(XByteBuffer.toBytes(msb),0,data,0,8);
- System.arraycopy(XByteBuffer.toBytes(lsb),0,data,8,8);
- return data;
- }
/**
* Send a message to one or more members in the cluster
* @param destination Member[] - the destinations, null or zero length
means all
@@ -101,9 +92,8 @@
if ( msg == null ) return;
try {
int options = 0;
- ClusterData data = new ClusterData();
+ ClusterData data = new ClusterData();//generates a unique Id
data.setAddress(getLocalMember());
- data.setUniqueId(getUUID());
data.setTimestamp(System.currentTimeMillis());
byte[] b = null;
if ( msg instanceof ByteMessage ){
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java
Mon Feb 27 13:07:39 2006
@@ -24,6 +24,7 @@
import org.apache.catalina.tribes.mcast.McastMember;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
+import java.util.UUID;
/**
* The cluster data class is used to transport around the byte array from
@@ -42,7 +43,15 @@
private byte[] uniqueId ;
private Member address;
- public ClusterData() {}
+ public ClusterData() {
+ this(true);
+ }
+
+ public ClusterData(boolean generateUUID) {
+ if ( generateUUID ) generateUUID();
+ }
+
+
/**
* @param type message type (class)
@@ -118,6 +127,17 @@
this.address = address;
}
+ public void generateUUID() {
+ UUID id = UUID.randomUUID();
+ long msb = id.getMostSignificantBits();
+ long lsb = id.getLeastSignificantBits();
+ byte[] data = new byte[16];
+ System.arraycopy(XByteBuffer.toBytes(msb),0,data,0,8);
+ System.arraycopy(XByteBuffer.toBytes(lsb),0,data,8,8);
+ setUniqueId(data);
+ }
+
+
/**
*
@@ -130,36 +150,57 @@
* @return byte[]
*/
public byte[] getDataPackage() throws IOException {
- ByteArrayOutputStream bout = new
ByteArrayOutputStream(getMessage().length*2);
- ObjectOutputStream out = new ObjectOutputStream(bout);
- out.writeInt(options);
- out.writeLong(timestamp);
- out.writeInt(uniqueId.length);
- out.write(uniqueId);
byte[] addr = ((McastMember)address).getData();
- out.writeInt(addr.length);
- out.write(addr);
- out.writeInt(message.length);
- out.write(message);
- out.flush();
- return bout.toByteArray();
- }
-
- public static ClusterData getDataFromPackage(byte[] dataPackage) throws
IOException {
- ByteArrayInputStream bin = new ByteArrayInputStream(dataPackage);
- ObjectInputStream in = new ObjectInputStream(bin);
- ClusterData data = new ClusterData();
- data.setOptions(in.readInt());
- data.setTimestamp(in.readLong());
- byte[] uniqueId = new byte[in.readInt()];
- in.read(uniqueId);
- data.setUniqueId(uniqueId);
- byte[] addr = new byte[in.readInt()];
- in.read(addr);
+ int length =
+ 4 + //options
+ 8 + //timestamp off=4
+ 4 + //unique id length off=12
+ uniqueId.length+ //id data off=12+uniqueId.length
+ 4 + //addr length off=12+uniqueId.length+4
+ addr.length+ //member data off=12+uniqueId.length+4+add.length
+ 4 + //message length off=12+uniqueId.length+4+add.length+4
+ message.length;
+ byte[] data = new byte[length];
+ int offset = 0;
+ XByteBuffer.toBytes(options,data,offset);
+ offset = 4; //options
+ XByteBuffer.toBytes(timestamp,data,offset);
+ offset += 8; //timestamp
+ XByteBuffer.toBytes(uniqueId.length,data,offset);
+ offset += 4; //uniqueId.length
+ System.arraycopy(uniqueId,0,data,offset,uniqueId.length);
+ offset += uniqueId.length; //uniqueId data
+ XByteBuffer.toBytes(addr.length,data,offset);
+ offset += 4; //addr.length
+ System.arraycopy(addr,0,data,offset,addr.length);
+ offset += addr.length; //addr data
+ XByteBuffer.toBytes(message.length,data,offset);
+ offset += 4; //message.length
+ System.arraycopy(message,0,data,offset,message.length);
+ offset += message.length; //message data
+ return data;
+ }
+
+ public static ClusterData getDataFromPackage(byte[] b) throws IOException {
+ ClusterData data = new ClusterData(false);
+ int offset = 0;
+ data.setOptions(XByteBuffer.toInt(b,offset));
+ offset += 4; //options
+ data.setTimestamp(XByteBuffer.toLong(b,offset));
+ offset += 8; //timestamp
+ data.uniqueId = new byte[XByteBuffer.toInt(b,offset)];
+ offset += 4; //uniqueId length
+ System.arraycopy(b,offset,data.uniqueId,0,data.uniqueId.length);
+ offset += data.uniqueId.length; //uniqueId data
+ byte[] addr = new byte[XByteBuffer.toInt(b,offset)];
+ offset += 4; //addr length
+ System.arraycopy(b,offset,addr,0,addr.length);
data.setAddress(McastMember.getMember(addr));
- byte[] message = new byte[in.readInt()];
- in.read(message);
- data.setMessage(message);
+ offset += addr.length; //addr data
+ data.message = new byte[XByteBuffer.toInt(b,offset)];
+ offset += 4; //message length
+ System.arraycopy(b,offset,data.message,0,data.message.length);
+ offset += data.message.length; //message data
return data;
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
Mon Feb 27 13:07:39 2006
@@ -276,14 +276,17 @@
* @return - four bytes in an array
*/
public static byte[] toBytes(int n) {
- byte[] b = new byte[4];
- b[3] = (byte) (n);
+ return toBytes(n,new byte[4],0);
+ }
+
+ public static byte[] toBytes(int n,byte[] b, int offset) {
+ b[offset+3] = (byte) (n);
n >>>= 8;
- b[2] = (byte) (n);
+ b[offset+2] = (byte) (n);
n >>>= 8;
- b[1] = (byte) (n);
+ b[offset+1] = (byte) (n);
n >>>= 8;
- b[0] = (byte) (n);
+ b[offset+0] = (byte) (n);
return b;
}
@@ -293,22 +296,24 @@
* @return - eight bytes in an array
*/
public static byte[] toBytes(long n) {
- byte[] b = new byte[8];
- b[7] = (byte) (n);
+ return toBytes(n,new byte[8],0);
+ }
+ public static byte[] toBytes(long n, byte[] b, int offset) {
+ b[offset+7] = (byte) (n);
n >>>= 8;
- b[6] = (byte) (n);
+ b[offset+6] = (byte) (n);
n >>>= 8;
- b[5] = (byte) (n);
+ b[offset+5] = (byte) (n);
n >>>= 8;
- b[4] = (byte) (n);
+ b[offset+4] = (byte) (n);
n >>>= 8;
- b[3] = (byte) (n);
+ b[offset+3] = (byte) (n);
n >>>= 8;
- b[2] = (byte) (n);
+ b[offset+2] = (byte) (n);
n >>>= 8;
- b[1] = (byte) (n);
+ b[offset+1] = (byte) (n);
n >>>= 8;
- b[0] = (byte) (n);
+ b[offset+0] = (byte) (n);
return b;
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
Mon Feb 27 13:07:39 2006
@@ -64,7 +64,8 @@
/**
* The name of the cluster domain from this node
*/
- private String domain;
+ protected byte[] domain;
+ protected transient String domainname;
/**
* Counter for how many messages have been sent from this member
@@ -101,7 +102,7 @@
long aliveTime) throws IOException {
setHostname(host);
this.port = port;
- this.domain = domain;
+ this.domain = domain.getBytes();
this.memberAliveTime=aliveTime;
}
@@ -141,7 +142,7 @@
//host - 4 bytes
//dlen - 4 bytes
//domain - dlen bytes
- byte[] domaind = getDomain().getBytes();
+ byte[] domaind = this.domain;
byte[] addr = host;
byte[] data = new byte[8+4+addr.length+4+domaind.length];
long alive=System.currentTimeMillis()-getServiceStartTime();
@@ -201,7 +202,8 @@
* @return a cluster domain to the cluster
*/
public String getDomain() {
- return domain;
+ if ( this.domainname == null ) this.domainname = new String(domain);
+ return this.domainname;
}
/**
@@ -256,7 +258,7 @@
* String representation of this object
*/
public String toString() {
- return
"org.apache.catalina.tribes.mcast.McastMember["+getName()+","+domain+","+getHostname()+","+port+",
alive="+memberAliveTime+"]";
+ return
"org.apache.catalina.tribes.mcast.McastMember["+getName()+","+getDomain()+","+getHostname()+","+port+",
alive="+memberAliveTime+"]";
}
/**
@@ -355,7 +357,7 @@
}
public void setDomain(String domain) {
- this.domain = domain;
+ this.domain = domain.getBytes();
}
public void setPort(int port) {
this.port = port;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
Mon Feb 27 13:07:39 2006
@@ -72,6 +72,7 @@
* current sender socket
*/
private Socket socket = null;
+ private OutputStream socketout = null;
/**
* is Socket really connected
@@ -698,6 +699,7 @@
*/
protected void createSocket() throws IOException, SocketException {
socket = new Socket(getAddress(), getPort());
+ this.socketout = socket.getOutputStream();
}
/**
@@ -846,9 +848,8 @@
isMessageTransferStarted = true ;
}
try {
- OutputStream out = socket.getOutputStream();
- out.write(XByteBuffer.createDataPackage((ClusterData)data));
- out.flush();
+ socketout.write(XByteBuffer.createDataPackage((ClusterData)data));
+ socketout.flush();
if (isWaitForAck()) waitForAck(ackTimeout);
} finally {
synchronized(this) {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=381446&r1=381445&r2=381446&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
Mon Feb 27 13:07:39 2006
@@ -376,7 +376,7 @@
time = System.currentTimeMillis();
}
try {
- String key = getKey(member);
+ Object key = getKey(member);
IDataSender sender = (IDataSender) map.get(key);
sendMessageData(message, sender);
} finally {
@@ -506,7 +506,7 @@
*/
public synchronized void add(Member member) {
try {
- String key = getKey(member);
+ Object key = getKey(member);
if (!map.containsKey(key)) {
IDataSender sender = IDataSenderFactory.getIDataSender(
replicationMode, member);
@@ -524,7 +524,7 @@
* @see
org.apache.catalina.tribes.ClusterSender#remove(org.apache.catalina.tribes.Member)
*/
public synchronized void remove(Member member) {
- String key = getKey(member);
+ Object key = getKey(member);
IDataSender toberemoved = (IDataSender) map.get(key);
if (toberemoved == null)
return;
@@ -570,8 +570,8 @@
* @param member
* @return concat member.host:member.port
*/
- protected String getKey(Member member) {
- return member.getHost() + ":" + member.getPort();
+ protected Object getKey(Member member) {
+ return member;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]