Author: fhanik Date: Mon Feb 27 15:01:57 2006 New Revision: 381477 URL: http://svn.apache.org/viewcvs?rev=381477&view=rev Log: Added in the ability to use direct byte buffers on the ReplicationListener, didn't notice much difference
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java?rev=381477&r1=381476&r2=381477&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/demos/LoadTest.java Mon Feb 27 15:01:57 2006 @@ -383,6 +383,7 @@ channel.setChannelListener(test); channel.setMembershipListener(test); channel.start(channel.DEFAULT); + Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); while ( threads > 1 ) { Thread t = new Thread(test); t.setDaemon(true); @@ -394,6 +395,24 @@ System.out.println("System test complete, sleeping to let threads finish."); Thread.sleep(60*1000*60); - } + } + + public static class Shutdown extends Thread { + ManagedChannel channel = null; + public Shutdown(ManagedChannel channel) { + this.channel = channel; + } + + public void run() { + System.out.println("Shutting down..."); + try { + channel.stop(channel.DEFAULT); + + }catch ( Exception x ) { + x.printStackTrace(); + } + System.out.println("Channel stopped."); + } + } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java?rev=381477&r1=381476&r2=381477&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ObjectReader.java Mon Feb 27 15:01:57 2006 @@ -80,6 +80,13 @@ * @return number of messages that sended to callback * @throws java.io.IOException */ + public int append(ByteBuffer data, int len, boolean count) throws java.io.IOException { + buffer.append(data,len); + int pkgCnt = -1; + if ( count ) pkgCnt = buffer.countPackages(); + return pkgCnt; + } + public int append(byte[] data,int off,int len, boolean count) throws java.io.IOException { buffer.append(data,off,len); int pkgCnt = -1; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=381477&r1=381476&r2=381477&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Mon Feb 27 15:01:57 2006 @@ -29,6 +29,7 @@ import java.io.Serializable; import org.apache.catalina.tribes.Member; import java.util.UUID; +import java.nio.ByteBuffer; /** * The XByteBuffer provides a dual functionality. @@ -66,12 +67,12 @@ /** * Default size on the initial byte buffer */ - static final int DEF_SIZE = 1024; + public static final int DEF_SIZE = 2048; /** * Default size to extend the buffer with */ - static final int DEF_EXT = 1024; + public static final int DEF_EXT = 1024; /** * Variable to hold the data @@ -122,6 +123,27 @@ * @param len - the number of bytes to append. * @return true if the data was appended correctly. Returns false if the package is incorrect, ie missing header or something, or the length of data is 0 */ + public boolean append(ByteBuffer b, int len) { + int newcount = bufSize + len; + if (newcount > buf.length) { + //don't change the allocation strategy + byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; + System.arraycopy(buf, 0, newbuf, 0, bufSize); + buf = newbuf; + } + b.get(buf,bufSize,len); + + bufSize = newcount; + + if (bufSize > START_DATA.length && (firstIndexOf(buf,0,START_DATA)==-1)){ + bufSize = 0; + log.error("Discarded the package, invalid header"); + return false; + } + return true; + + } + public boolean append(byte[] b, int off, int len) { if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { @@ -132,6 +154,7 @@ int newcount = bufSize + len; if (newcount > buf.length) { + //don't change the allocation strategy byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; System.arraycopy(buf, 0, newbuf, 0, bufSize); buf = newbuf; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java?rev=381477&r1=381476&r2=381477&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java Mon Feb 27 15:01:57 2006 @@ -31,6 +31,7 @@ import org.apache.catalina.tribes.MessageListener; import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.io.ObjectReader; +import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.util.StringManager; /** @@ -38,10 +39,17 @@ * @author Peter Rossbach * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $ */ -public class ReplicationListener - implements Runnable, ChannelReceiver, ListenCallback { - protected static org.apache.commons.logging.Log log = - org.apache.commons.logging.LogFactory.getLog(ReplicationListener.class); +public class ReplicationListener implements Runnable, ChannelReceiver, ListenCallback { + /** + * @todo make this configurable + */ + public static int BUFFER_RECEIVE_SIZE = XByteBuffer.DEF_SIZE; + /** + * We are only sending acks + */ + public static int BUFFER_SEND_SIZE = 128; + + protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ReplicationListener.class); /** * The string manager for this package. @@ -63,15 +71,13 @@ private int tcpListenPort; private boolean sendAck; protected boolean doListen = false; - /** - * Compress message data bytes - */ - private boolean compress = true; + private Object interestOpsMutex = new Object(); private MessageListener listener = null; private boolean sync; + private boolean direct; public ReplicationListener() { } @@ -185,8 +191,9 @@ ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); - Object attach = new ObjectReader(channel, selector, - this); + channel.socket().setReceiveBufferSize(BUFFER_RECEIVE_SIZE); + channel.socket().setSendBufferSize(BUFFER_SEND_SIZE); + Object attach = new ObjectReader(channel, selector,this); registerChannel(selector, channel, SelectionKey.OP_READ, @@ -328,20 +335,6 @@ } /** - * @return Returns the compress. - */ - public boolean isCompress() { - return compress; - } - - /** - * @param compressMessageData The compress to set. - */ - public void setCompress(boolean compressMessageData) { - this.compress = compressMessageData; - } - - /** * Send ACK to sender * * @return True if sending ACK @@ -375,6 +368,10 @@ return sync; } + public boolean getDirect() { + return direct; + } + public MessageListener getMessageListener() { return listener; } @@ -383,6 +380,9 @@ this.tcpListenPort = tcpListenPort; } + public void setDirect(boolean direct) { + this.direct = direct; + } public void setSynchronized(boolean sync) { this.sync = sync; @@ -396,6 +396,7 @@ int options = 0; if ( getSynchronized() ) options = options |TcpReplicationThread.OPTION_SYNCHRONIZED; if ( getSendAck() ) options = options |TcpReplicationThread.OPTION_SEND_ACK; + if ( getDirect() ) options = options | TcpReplicationThread.OPTION_DIRECT_BUFFER; return options; } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java?rev=381477&r1=381476&r2=381477&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/TcpReplicationThread.java Mon Feb 27 15:01:57 2006 @@ -39,11 +39,12 @@ public class TcpReplicationThread extends WorkerThread { public static final int OPTION_SEND_ACK = 0x0001; public static final int OPTION_SYNCHRONIZED = 0x0002; + public static final int OPTION_DIRECT_BUFFER = 0x0004; public static final byte[] ACK_COMMAND = new byte[] {6, 2, 3}; private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( TcpReplicationThread.class ); - private ByteBuffer buffer = ByteBuffer.allocate (1024); + private ByteBuffer buffer = null; private SelectionKey key; TcpReplicationThread () { @@ -52,6 +53,13 @@ // loop forever waiting for work to do public synchronized void run() { + if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) { + System.out.println("Creating a direct buffer"); + buffer = ByteBuffer.allocateDirect(ReplicationListener.BUFFER_RECEIVE_SIZE); + }else { + System.out.println("Creating a regular buffer"); + buffer = ByteBuffer.allocate (ReplicationListener.BUFFER_RECEIVE_SIZE); + } while (doRun) { try { // sleep and release object lock @@ -131,7 +139,12 @@ // loop while data available, channel is non-blocking while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable - reader.append(buffer.array(),0,count,false); + if ( buffer.hasArray() ) + reader.append(buffer.array(),0,count,false); + else + reader.append(buffer,count,false); + + buffer.clear(); // make buffer empty } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]