Author: fhanik Date: Thu Mar 2 07:32:45 2006 New Revision: 382412 URL: http://svn.apache.org/viewcvs?rev=382412&view=rev Log: Worked on the NIO datasender, state machine is complete, time to start working on parallelism
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=382412&r1=382411&r2=382412&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Thu Mar 2 07:32:45 2006 @@ -238,12 +238,16 @@ return true; } - private void expand(int newcount) { + public void expand(int newcount) { //don't change the allocation strategy byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; System.arraycopy(buf, 0, newbuf, 0, bufSize); buf = newbuf; } + + public int getCapacity() { + return buf.length; + } /** @@ -285,7 +289,7 @@ /** * Method to check if a package exists in this byte buffer. - * @return - true if a complete package (header,compress,size,data,footer) exists within the buffer + * @return - true if a complete package (header,options,size,data,footer) exists within the buffer */ public boolean doesPackageExist() { return (countPackages()>0); @@ -297,19 +301,24 @@ * @param clearFromBuffer - if true, the package will be removed from the byte buffer * @return - returns the actual message bytes (header, compress,size and footer not included). */ - public ClusterData extractPackage(boolean clearFromBuffer) - throws java.io.IOException { + public byte[] extractDataPackage(boolean clearFromBuffer) { int psize = countPackages(); if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer"); int size = toInt(buf, START_DATA.length); byte[] data = new byte[size]; System.arraycopy(buf, START_DATA.length + 4, data, 0, size); - ClusterData cdata = ClusterData.getDataFromPackage(data); if (clearFromBuffer) { int totalsize = START_DATA.length + 4 + size + END_DATA.length; bufSize = bufSize - totalsize; System.arraycopy(buf, totalsize, buf, 0, bufSize); } + return data; + + } + + public ClusterData extractPackage(boolean clearFromBuffer) throws java.io.IOException { + byte[] data = extractDataPackage(clearFromBuffer); + ClusterData cdata = ClusterData.getDataFromPackage(data); return cdata; } @@ -324,20 +333,38 @@ return createDataPackage(cdata.getDataPackage()); } - public static byte[] createDataPackage(byte[] data) { + public static int getDataPackageLength(int datalength) { int length = START_DATA.length + //header length 4 + //data length indicator - data.length + //actual data length + datalength + //actual data length END_DATA.length; //footer length + return length; + + } + + public static byte[] createDataPackage(byte[] data) { + int length = getDataPackageLength(data.length); byte[] result = new byte[length]; - System.arraycopy(START_DATA, 0, result, 0, START_DATA.length); - System.arraycopy(toBytes(data.length), 0, result, START_DATA.length, 4); - System.arraycopy(data, 0, result, START_DATA.length + 4, data.length); - System.arraycopy(END_DATA, 0, result, START_DATA.length + 4 + data.length, END_DATA.length); - return result; + return createDataPackage(data,0,data.length,result,0); + } + + public static byte[] createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff) { + if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) { + throw new ArrayIndexOutOfBoundsException("Unable to create data package, buffer is too small."); + } + System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length); + System.arraycopy(toBytes(data.length), 0, buffer, bufoff+START_DATA.length, 4); + System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength); + System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length); + return buffer; } + public static void fillDataPackage(byte[] data, int doff, int dlength, XByteBuffer buf) { + int pkglen = getDataPackageLength(dlength); + if ( buf.getCapacity() < pkglen ) buf.expand(pkglen); + createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength()); + } /** * Convert four bytes to an int Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382412&r1=382411&r2=382412&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Thu Mar 2 07:32:45 2006 @@ -26,6 +26,7 @@ import org.apache.catalina.tribes.io.ClusterData; import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.util.StringManager; +import java.util.Arrays; /** * Send cluster messages with only one socket. Ack and keep Alive Handling is @@ -921,7 +922,7 @@ byte d = (byte)i; ackbuf.append(d); if (ackbuf.doesPackageExist() ) { - ackReceived = true; + ackReceived = Arrays.equals(ackbuf.extractDataPackage(true),Constants.ACK_DATA); break; } i = socket.getInputStream().read(); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java?rev=382412&r1=382411&r2=382412&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java Thu Mar 2 07:32:45 2006 @@ -26,6 +26,9 @@ import java.nio.channels.SelectionKey; import java.nio.ByteBuffer; import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.Member; +import java.util.Arrays; +import org.apache.catalina.tribes.io.ClusterData; /** * This class is NOT thread safe and should never be used with more than one thread at a time @@ -47,25 +50,29 @@ protected long ackTimeout = 15000; - protected InetAddress address; protected String domain = ""; - protected int port; protected boolean suspect = false; protected boolean connected = false; protected boolean waitForAck = false; protected int rxBufSize = 25188; protected int txBufSize = 43800; protected Selector selector; - + protected Member destination; protected SocketChannel socketChannel; - protected ByteBuffer buf = null; + + /* + * STATE VARIABLES * + */ + protected ByteBuffer readbuf = null; protected boolean direct = false; - protected ChannelMessage current = null; + protected byte[] current = null; protected int curPos=0; protected XByteBuffer ackbuf = new XByteBuffer(128,true); + protected int remaining = 0; - public NioSender() { + public NioSender(Member destination) { + this.destination = destination; } @@ -76,7 +83,11 @@ if ( socketChannel.finishConnect() ) { //we connected, register ourselves for writing this.connected = true; - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + return false; + } else { + //wait for the connection to finish + key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT); return false; } } else if ( key.isWritable() ) { @@ -84,7 +95,7 @@ if ( writecomplete ) { //we are completed, should we read an ack? if ( waitForAck ) key.interestOps(key.interestOps()|SelectionKey.OP_READ); - //if not, we are ready, setMessage will reregister us for write interest + //if not, we are ready, setMessage will reregister us for another write interest else return true; } else { //we are not complete, lets write some more @@ -106,12 +117,19 @@ protected boolean read(SelectionKey key) throws IOException { //if there is no message here, we are done if ( current == null ) return true; - int read = socketChannel.read(buf); + int read = socketChannel.read(readbuf); //end of stream - if ( read == -1 ) return true; + if ( read == -1 ) throw new IOException("Unable to receive an ack message."); //no data read else if ( read == 0 ) return false; - throw new UnsupportedOperationException(); + readbuf.flip(); + ackbuf.append(readbuf,read); + readbuf.clear(); + if (ackbuf.doesPackageExist() ) { + return Arrays.equals(ackbuf.extractDataPackage(true),Constants.ACK_DATA); + } else { + return false; + } } @@ -120,25 +138,20 @@ throw new IOException("NioSender is not connected, this should not occur."); } if ( current != null ) { - int remaining = buf.remaining(); if ( remaining > 0 ) { - //write the rest of the buffer - remaining -= socketChannel.write(buf); - } - if ( remaining == 0 ) { //weve written everything, or we are starting a new package - XByteBuffer msg = current.getMessage(); - remaining = msg.getLength() - curPos; - buf.clear(); //protect against buffer overwrite - int length = Math.min(remaining,txBufSize); - buf.put(msg.getBytesDirect(),curPos,length); - - //if the entire message fits in the buffer + int length = current.length-curPos; + ByteBuffer writebuf = ByteBuffer.wrap(current,curPos,length); + int byteswritten = socketChannel.write(writebuf); + curPos += byteswritten; + remaining -= byteswritten; + //if the entire message was written from the buffer //reset the position counter - curPos += length; - if ( curPos >= msg.getLength() ) curPos = 0; - remaining -= socketChannel.write(buf); + if ( curPos >= current.length ) { + curPos = 0; + remaining = 0; + } } //the write return (remaining==0 && curPos == 0); @@ -155,11 +168,12 @@ */ public synchronized void connect() throws IOException { if ( connected ) throw new IOException("NioSender is already in connected state."); - if ( buf == null ) { - if ( direct ) buf = ByteBuffer.allocateDirect(txBufSize); - else buf = ByteBuffer.allocate(txBufSize); + if ( readbuf == null ) { + readbuf = getReadBuffer(); + } else { + readbuf.clear(); } - InetSocketAddress addr = new InetSocketAddress(address,port); + InetSocketAddress addr = new InetSocketAddress(InetAddress.getByAddress(destination.getHost()),destination.getPort()); if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); @@ -177,14 +191,29 @@ public void disconnect() { try { this.connected = false; - if ( buf != null ) buf.clear(); socketChannel.close(); socketChannel = null; - curPos = 0; } catch ( Exception x ) { log.error("Unable to disconnect.",x); + } finally { + reset(); + } + + } + + public void reset() { + if ( connected && readbuf == null) { + readbuf = getReadBuffer(); } + if ( readbuf != null ) readbuf.clear(); + current = null; + curPos = 0; + ackbuf.clear(); + remaining = 0; + } + private ByteBuffer getReadBuffer() { + return (direct?ByteBuffer.allocateDirect(rxBufSize):ByteBuffer.allocate(rxBufSize)); } /** @@ -195,14 +224,15 @@ * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method */ public synchronized void setMessage(ChannelMessage data) throws IOException { - this.current = data; + reset(); if ( data != null ) { - if (!this.connected) { - connect(); - } else { + current = XByteBuffer.createDataPackage((ClusterData)data); + remaining = current.length; + curPos = 0; + if (connected) { socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); } - } + } } @@ -226,35 +256,7 @@ return this.ackTimeout; } - /** - * getAddress - * - * @return InetAddress - * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method - */ - public InetAddress getAddress() { - return address; - } - - /** - * getDomain - * - * @return String - * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method - */ - public String getDomain() { - return domain; - } - - /** - * getPort - * - * @return int - * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method - */ - public int getPort() { - return port; - } + /** * getSuspect @@ -303,36 +305,6 @@ */ public void setAckTimeout(long timeout) { this.ackTimeout = timeout; - } - - /** - * setAddress - * - * @param address InetAddress - * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method - */ - public void setAddress(InetAddress address) { - this.address = address; - } - - /** - * setDomain - * - * @param domain String - * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method - */ - public void setDomain(String domain) { - this.domain = domain; - } - - /** - * setPort - * - * @param port int - * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method - */ - public void setPort(int port) { - this.port = port; } /** Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java?rev=382412&r1=382411&r2=382412&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java Thu Mar 2 07:32:45 2006 @@ -17,13 +17,8 @@ package org.apache.catalina.tribes.tcp; /** - * <p>Title: </p> - * - * <p>Description: </p> - * - * <p>Copyright: Copyright (c) 2005</p> - * - * <p>Company: </p> + * A class that uses NIO to send data in parallel to several remote nodes. + * * * @author Filip Hanik * @version 1.0 --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]