Author: fhanik
Date: Mon Feb 27 15:01:57 2006
New Revision: 381477
URL: http://svn.apache.org/viewcvs?rev=381477&view=rev
Log:
Added in the ability to use direct byte buffers on the ReplicationListener,
didn't notice much difference
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java
Mon Feb 27 15:01:57 2006
@@ -383,6 +383,7 @@
channel.setChannelListener(test);
channel.setMembershipListener(test);
channel.start(channel.DEFAULT);
+ Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
while ( threads > 1 ) {
Thread t = new Thread(test);
t.setDaemon(true);
@@ -394,6 +395,24 @@
System.out.println("System test complete, sleeping to let threads
finish.");
Thread.sleep(60*1000*60);
- }
+ }
+
+ public static class Shutdown extends Thread {
+ ManagedChannel channel = null;
+ public Shutdown(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ public void run() {
+ System.out.println("Shutting down...");
+ try {
+ channel.stop(channel.DEFAULT);
+
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ System.out.println("Channel stopped.");
+ }
+ }
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java
Mon Feb 27 15:01:57 2006
@@ -80,6 +80,13 @@
* @return number of messages that sended to callback
* @throws java.io.IOException
*/
+ public int append(ByteBuffer data, int len, boolean count) throws
java.io.IOException {
+ buffer.append(data,len);
+ int pkgCnt = -1;
+ if ( count ) pkgCnt = buffer.countPackages();
+ return pkgCnt;
+ }
+
public int append(byte[] data,int off,int len, boolean count) throws
java.io.IOException {
buffer.append(data,off,len);
int pkgCnt = -1;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
Mon Feb 27 15:01:57 2006
@@ -29,6 +29,7 @@
import java.io.Serializable;
import org.apache.catalina.tribes.Member;
import java.util.UUID;
+import java.nio.ByteBuffer;
/**
* The XByteBuffer provides a dual functionality.
@@ -66,12 +67,12 @@
/**
* Default size on the initial byte buffer
*/
- static final int DEF_SIZE = 1024;
+ public static final int DEF_SIZE = 2048;
/**
* Default size to extend the buffer with
*/
- static final int DEF_EXT = 1024;
+ public static final int DEF_EXT = 1024;
/**
* Variable to hold the data
@@ -122,6 +123,27 @@
* @param len - the number of bytes to append.
* @return true if the data was appended correctly. Returns false if the
package is incorrect, ie missing header or something, or the length of data is 0
*/
+ public boolean append(ByteBuffer b, int len) {
+ int newcount = bufSize + len;
+ if (newcount > buf.length) {
+ //don't change the allocation strategy
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, bufSize);
+ buf = newbuf;
+ }
+ b.get(buf,bufSize,len);
+
+ bufSize = newcount;
+
+ if (bufSize > START_DATA.length &&
(firstIndexOf(buf,0,START_DATA)==-1)){
+ bufSize = 0;
+ log.error("Discarded the package, invalid header");
+ return false;
+ }
+ return true;
+
+ }
+
public boolean append(byte[] b, int off, int len) {
if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
@@ -132,6 +154,7 @@
int newcount = bufSize + len;
if (newcount > buf.length) {
+ //don't change the allocation strategy
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, bufSize);
buf = newbuf;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
Mon Feb 27 15:01:57 2006
@@ -31,6 +31,7 @@
import org.apache.catalina.tribes.MessageListener;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.io.ObjectReader;
+import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.util.StringManager;
/**
@@ -38,10 +39,17 @@
* @author Peter Rossbach
* @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb
2006) $
*/
-public class ReplicationListener
- implements Runnable, ChannelReceiver, ListenCallback {
- protected static org.apache.commons.logging.Log log =
-
org.apache.commons.logging.LogFactory.getLog(ReplicationListener.class);
+public class ReplicationListener implements Runnable, ChannelReceiver,
ListenCallback {
+ /**
+ * @todo make this configurable
+ */
+ public static int BUFFER_RECEIVE_SIZE = XByteBuffer.DEF_SIZE;
+ /**
+ * We are only sending acks
+ */
+ public static int BUFFER_SEND_SIZE = 128;
+
+ protected static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(ReplicationListener.class);
/**
* The string manager for this package.
@@ -63,15 +71,13 @@
private int tcpListenPort;
private boolean sendAck;
protected boolean doListen = false;
- /**
- * Compress message data bytes
- */
- private boolean compress = true;
+
private Object interestOpsMutex = new Object();
private MessageListener listener = null;
private boolean sync;
+ private boolean direct;
public ReplicationListener() {
}
@@ -185,8 +191,9 @@
ServerSocketChannel server =
(ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
- Object attach = new ObjectReader(channel, selector,
- this);
+
channel.socket().setReceiveBufferSize(BUFFER_RECEIVE_SIZE);
+ channel.socket().setSendBufferSize(BUFFER_SEND_SIZE);
+ Object attach = new ObjectReader(channel,
selector,this);
registerChannel(selector,
channel,
SelectionKey.OP_READ,
@@ -328,20 +335,6 @@
}
/**
- * @return Returns the compress.
- */
- public boolean isCompress() {
- return compress;
- }
-
- /**
- * @param compressMessageData The compress to set.
- */
- public void setCompress(boolean compressMessageData) {
- this.compress = compressMessageData;
- }
-
- /**
* Send ACK to sender
*
* @return True if sending ACK
@@ -375,6 +368,10 @@
return sync;
}
+ public boolean getDirect() {
+ return direct;
+ }
+
public MessageListener getMessageListener() {
return listener;
}
@@ -383,6 +380,9 @@
this.tcpListenPort = tcpListenPort;
}
+ public void setDirect(boolean direct) {
+ this.direct = direct;
+ }
public void setSynchronized(boolean sync) {
this.sync = sync;
@@ -396,6 +396,7 @@
int options = 0;
if ( getSynchronized() ) options = options
|TcpReplicationThread.OPTION_SYNCHRONIZED;
if ( getSendAck() ) options = options
|TcpReplicationThread.OPTION_SEND_ACK;
+ if ( getDirect() ) options = options |
TcpReplicationThread.OPTION_DIRECT_BUFFER;
return options;
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java?rev=381477&r1=381476&r2=381477&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java
Mon Feb 27 15:01:57 2006
@@ -39,11 +39,12 @@
public class TcpReplicationThread extends WorkerThread {
public static final int OPTION_SEND_ACK = 0x0001;
public static final int OPTION_SYNCHRONIZED = 0x0002;
+ public static final int OPTION_DIRECT_BUFFER = 0x0004;
public static final byte[] ACK_COMMAND = new byte[] {6, 2, 3};
private static org.apache.commons.logging.Log log =
org.apache.commons.logging.LogFactory.getLog(
TcpReplicationThread.class );
- private ByteBuffer buffer = ByteBuffer.allocate (1024);
+ private ByteBuffer buffer = null;
private SelectionKey key;
TcpReplicationThread ()
{
@@ -52,6 +53,13 @@
// loop forever waiting for work to do
public synchronized void run()
{
+ if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
+ System.out.println("Creating a direct buffer");
+ buffer =
ByteBuffer.allocateDirect(ReplicationListener.BUFFER_RECEIVE_SIZE);
+ }else {
+ System.out.println("Creating a regular buffer");
+ buffer = ByteBuffer.allocate
(ReplicationListener.BUFFER_RECEIVE_SIZE);
+ }
while (doRun) {
try {
// sleep and release object lock
@@ -131,7 +139,12 @@
// loop while data available, channel is non-blocking
while ((count = channel.read (buffer)) > 0) {
buffer.flip(); // make buffer readable
- reader.append(buffer.array(),0,count,false);
+ if ( buffer.hasArray() )
+ reader.append(buffer.array(),0,count,false);
+ else
+ reader.append(buffer,count,false);
+
+
buffer.clear(); // make buffer empty
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]