Author: fhanik
Date: Thu Mar  2 07:32:45 2006
New Revision: 382412

URL: http://svn.apache.org/viewcvs?rev=382412&view=rev
Log:
Worked on the NIO datasender, state machine is complete, time to start working 
on parallelism

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=382412&r1=382411&r2=382412&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Thu Mar  2 07:32:45 2006
@@ -238,12 +238,16 @@
         return true;
     }
 
-    private void expand(int newcount) {
+    public void expand(int newcount) {
         //don't change the allocation strategy
         byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
         System.arraycopy(buf, 0, newbuf, 0, bufSize);
         buf = newbuf;
     }
+    
+    public int getCapacity() {
+        return buf.length;
+    }
 
 
     /**
@@ -285,7 +289,7 @@
 
     /**
      * Method to check if a package exists in this byte buffer.
-     * @return - true if a complete package (header,compress,size,data,footer) 
exists within the buffer
+     * @return - true if a complete package (header,options,size,data,footer) 
exists within the buffer
      */
     public boolean doesPackageExist()  {
         return (countPackages()>0);
@@ -297,19 +301,24 @@
      * @param clearFromBuffer - if true, the package will be removed from the 
byte buffer
      * @return - returns the actual message bytes (header, compress,size and 
footer not included).
      */
-    public ClusterData extractPackage(boolean clearFromBuffer)
-            throws java.io.IOException {
+    public byte[] extractDataPackage(boolean clearFromBuffer) {
         int psize = countPackages();
         if (psize == 0) throw new java.lang.IllegalStateException("No package 
exists in XByteBuffer");
         int size = toInt(buf, START_DATA.length);
         byte[] data = new byte[size];
         System.arraycopy(buf, START_DATA.length + 4, data, 0, size);
-        ClusterData cdata = ClusterData.getDataFromPackage(data);
         if (clearFromBuffer) {
             int totalsize = START_DATA.length + 4 + size + END_DATA.length;
             bufSize = bufSize - totalsize;
             System.arraycopy(buf, totalsize, buf, 0, bufSize);
         }
+        return data;
+
+    }
+    
+    public ClusterData extractPackage(boolean clearFromBuffer) throws 
java.io.IOException {
+        byte[] data = extractDataPackage(clearFromBuffer);
+        ClusterData cdata = ClusterData.getDataFromPackage(data);
         return cdata;
     }
     
@@ -324,20 +333,38 @@
         return createDataPackage(cdata.getDataPackage());
     }
     
-    public static byte[] createDataPackage(byte[] data) {
+    public static int getDataPackageLength(int datalength) {
         int length = 
             START_DATA.length + //header length
             4 + //data length indicator
-            data.length + //actual data length
+            datalength + //actual data length
             END_DATA.length; //footer length
+        return length;
+
+    }
+    
+    public static byte[] createDataPackage(byte[] data) {
+        int length = getDataPackageLength(data.length);
         byte[] result = new byte[length];
-        System.arraycopy(START_DATA, 0, result, 0, START_DATA.length);
-        System.arraycopy(toBytes(data.length), 0, result, START_DATA.length, 
4);
-        System.arraycopy(data, 0, result, START_DATA.length + 4, data.length);
-        System.arraycopy(END_DATA, 0, result, START_DATA.length + 4 + 
data.length, END_DATA.length);
-        return result;
+        return createDataPackage(data,0,data.length,result,0);
+    }
+    
+    public static byte[] createDataPackage(byte[] data, int doff, int dlength, 
byte[] buffer, int bufoff) {
+        if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) {
+            throw new ArrayIndexOutOfBoundsException("Unable to create data 
package, buffer is too small.");
+        }
+        System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length);
+        System.arraycopy(toBytes(data.length), 0, buffer, 
bufoff+START_DATA.length, 4);
+        System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, 
dlength);
+        System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + 
data.length, END_DATA.length);
+        return buffer;
     }
 
+    public static void fillDataPackage(byte[] data, int doff, int dlength, 
XByteBuffer buf) {
+        int pkglen = getDataPackageLength(dlength);
+        if ( buf.getCapacity() <  pkglen ) buf.expand(pkglen);
+        
createDataPackage(data,doff,dlength,buf.getBytesDirect(),buf.getLength());
+    }
 
     /**
      * Convert four bytes to an int

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382412&r1=382411&r2=382412&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
 Thu Mar  2 07:32:45 2006
@@ -26,6 +26,7 @@
 import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.util.StringManager;
+import java.util.Arrays;
 
 /**
  * Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -921,7 +922,7 @@
                 byte d = (byte)i;
                 ackbuf.append(d);
                 if (ackbuf.doesPackageExist() ) {
-                    ackReceived = true;
+                    ackReceived = 
Arrays.equals(ackbuf.extractDataPackage(true),Constants.ACK_DATA);
                     break;
                 }
                 i = socket.getInputStream().read();

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java?rev=382412&r1=382411&r2=382412&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
 Thu Mar  2 07:32:45 2006
@@ -26,6 +26,9 @@
 import java.nio.channels.SelectionKey;
 import java.nio.ByteBuffer;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.Member;
+import java.util.Arrays;
+import org.apache.catalina.tribes.io.ClusterData;
 
 /**
  * This class is NOT thread safe and should never be used with more than one 
thread at a time
@@ -47,25 +50,29 @@
 
     
     protected long ackTimeout = 15000;
-    protected InetAddress address;
     protected String domain = "";
-    protected int port;
     protected boolean suspect = false;
     protected boolean connected = false;
     protected boolean waitForAck = false;
     protected int rxBufSize = 25188;
     protected int txBufSize = 43800;
     protected Selector selector;
-    
+    protected Member destination;
     
     protected SocketChannel socketChannel;
-    protected ByteBuffer buf = null;
+
+    /*
+     * STATE VARIABLES *
+     */
+    protected ByteBuffer readbuf = null;
     protected boolean direct = false;
-    protected ChannelMessage current = null;
+    protected byte[] current = null;
     protected int curPos=0;
     protected XByteBuffer ackbuf = new XByteBuffer(128,true);
+    protected int remaining = 0;
 
-    public NioSender() {
+    public NioSender(Member destination) {
+        this.destination = destination;
         
     }
     
@@ -76,7 +83,11 @@
             if ( socketChannel.finishConnect() ) {
                 //we connected, register ourselves for writing
                 this.connected = true;
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                if ( current != null ) key.interestOps(key.interestOps() | 
SelectionKey.OP_WRITE);
+                return false;
+            } else  { 
+                //wait for the connection to finish
+                key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
                 return false;
             }
         } else if ( key.isWritable() ) {
@@ -84,7 +95,7 @@
             if ( writecomplete ) {
                 //we are completed, should we read an ack?
                 if ( waitForAck ) 
key.interestOps(key.interestOps()|SelectionKey.OP_READ);
-                //if not, we are ready, setMessage will reregister us for 
write interest
+                //if not, we are ready, setMessage will reregister us for 
another write interest
                 else return true;
             } else {
                 //we are not complete, lets write some more
@@ -106,12 +117,19 @@
     protected boolean read(SelectionKey key) throws IOException {
         //if there is no message here, we are done
         if ( current == null ) return true;
-        int read = socketChannel.read(buf);
+        int read = socketChannel.read(readbuf);
         //end of stream
-        if ( read == -1 ) return true;
+        if ( read == -1 ) throw new IOException("Unable to receive an ack 
message.");
         //no data read
         else if ( read == 0 ) return false;
-        throw new UnsupportedOperationException();
+        readbuf.flip();
+        ackbuf.append(readbuf,read);
+        readbuf.clear();
+        if (ackbuf.doesPackageExist() ) {
+            return 
Arrays.equals(ackbuf.extractDataPackage(true),Constants.ACK_DATA);
+        } else {
+            return false;
+        }
     }
 
     
@@ -120,25 +138,20 @@
             throw new IOException("NioSender is not connected, this should not 
occur.");
         }
         if ( current != null ) {
-            int remaining = buf.remaining();
             if ( remaining > 0 ) {
-                //write the rest of the buffer
-                remaining -= socketChannel.write(buf);
-            }            
-            if ( remaining == 0 ) {
                 //weve written everything, or we are starting a new package
-                XByteBuffer msg = current.getMessage();
-                remaining = msg.getLength() - curPos;
-                buf.clear();
                 //protect against buffer overwrite
-                int length = Math.min(remaining,txBufSize);
-                buf.put(msg.getBytesDirect(),curPos,length);
-                
-                //if the entire message fits in the buffer
+                int length = current.length-curPos;
+                ByteBuffer writebuf = ByteBuffer.wrap(current,curPos,length);
+                int byteswritten = socketChannel.write(writebuf);
+                curPos += byteswritten;
+                remaining -= byteswritten;
+                //if the entire message was written from the buffer
                 //reset the position counter
-                curPos += length;
-                if ( curPos >= msg.getLength() ) curPos = 0;
-                remaining -= socketChannel.write(buf);
+                if ( curPos >= current.length ) {
+                    curPos = 0;
+                    remaining = 0;
+                }
             }
             //the write 
             return (remaining==0 && curPos == 0);
@@ -155,11 +168,12 @@
      */
     public synchronized void connect() throws IOException {
         if ( connected ) throw new IOException("NioSender is already in 
connected state.");
-        if ( buf == null ) {
-            if ( direct ) buf = ByteBuffer.allocateDirect(txBufSize);
-            else buf = ByteBuffer.allocate(txBufSize);
+        if ( readbuf == null ) {
+            readbuf = getReadBuffer();
+        } else {
+            readbuf.clear();
         }
-        InetSocketAddress addr = new InetSocketAddress(address,port);
+        InetSocketAddress addr = new 
InetSocketAddress(InetAddress.getByAddress(destination.getHost()),destination.getPort());
         if ( socketChannel != null ) throw new IOException("Socket channel has 
already been established. Connection might be in progress.");
         socketChannel = SocketChannel.open();
         socketChannel.configureBlocking(false);
@@ -177,14 +191,29 @@
     public void disconnect() {
         try {
             this.connected = false;
-            if ( buf != null ) buf.clear();
             socketChannel.close();
             socketChannel = null;
-            curPos = 0;
         } catch ( Exception x ) {
             log.error("Unable to disconnect.",x);
+        } finally {
+            reset();
+        }
+
+    }
+    
+    public void reset() {
+        if ( connected && readbuf == null) {
+            readbuf = getReadBuffer();
         }
+        if ( readbuf != null ) readbuf.clear();
+        current = null;
+        curPos = 0;
+        ackbuf.clear();
+        remaining = 0;
+    }
 
+    private ByteBuffer getReadBuffer() {
+        return 
(direct?ByteBuffer.allocateDirect(rxBufSize):ByteBuffer.allocate(rxBufSize));
     }
     
     /**
@@ -195,14 +224,15 @@
     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
     */
    public synchronized void setMessage(ChannelMessage data) throws IOException 
{
-       this.current = data;
+       reset();
        if ( data != null ) {
-           if (!this.connected) {
-               connect();
-           } else {
+           current = XByteBuffer.createDataPackage((ClusterData)data);
+           remaining = current.length;
+           curPos = 0;
+           if (connected) {
                socketChannel.register(getSelector(), SelectionKey.OP_WRITE, 
this);
            }
-       }
+       } 
    }
 
 
@@ -226,35 +256,7 @@
         return this.ackTimeout;
     }
 
-    /**
-     * getAddress
-     *
-     * @return InetAddress
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public InetAddress getAddress() {
-        return address;
-    }
-
-    /**
-     * getDomain
-     *
-     * @return String
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public String getDomain() {
-        return domain;
-    }
-
-    /**
-     * getPort
-     *
-     * @return int
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public int getPort() {
-        return port;
-    }
+    
 
     /**
      * getSuspect
@@ -303,36 +305,6 @@
      */
     public void setAckTimeout(long timeout) {
         this.ackTimeout = timeout;
-    }
-
-    /**
-     * setAddress
-     *
-     * @param address InetAddress
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public void setAddress(InetAddress address) {
-        this.address = address;
-    }
-
-    /**
-     * setDomain
-     *
-     * @param domain String
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public void setDomain(String domain) {
-        this.domain = domain;
-    }
-
-    /**
-     * setPort
-     *
-     * @param port int
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public void setPort(int port) {
-        this.port = port;
     }
 
     /**

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java?rev=382412&r1=382411&r2=382412&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
 Thu Mar  2 07:32:45 2006
@@ -17,13 +17,8 @@
 package org.apache.catalina.tribes.tcp;
 
 /**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Copyright: Copyright (c) 2005</p>
- *
- * <p>Company: </p>
+ * A class that uses NIO to send data in parallel to several remote nodes.
+ * 
  *
  * @author Filip Hanik
  * @version 1.0



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to