Author: fhanik
Date: Thu Mar  2 10:47:01 2006
New Revision: 382464

URL: http://svn.apache.org/viewcvs?rev=382464&view=rev
Log:
Initial skeleton of the ParallelNioSender has been completed. This little piece 
of code has the ability to transmit to multiple remote destinations in parallel 
since it uses non blocking IO and only one thread.

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java?rev=382464&r1=382463&r2=382464&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/NioSender.java
 Thu Mar  2 10:47:01 2006
@@ -19,17 +19,15 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.util.Arrays;
 
-import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.io.ClusterData;
 import org.apache.catalina.tribes.io.XByteBuffer;
-import java.net.Socket;
 
 /**
  * This class is NOT thread safe and should never be used with more than one 
thread at a time
@@ -49,9 +47,6 @@
 
     protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(NioSender.class);
 
-    
-    protected long ackTimeout = 15000;
-    protected String domain = "";
     protected boolean suspect = false;
     protected boolean connected = false;
     protected boolean waitForAck = false;
@@ -72,6 +67,8 @@
     protected int curPos=0;
     protected XByteBuffer ackbuf = new XByteBuffer(128,true);
     protected int remaining = 0;
+    private boolean complete;
+    private int attempt;
 
     public NioSender(Member destination) {
         this.destination = destination;
@@ -204,13 +201,15 @@
     public void disconnect() {
         try {
             this.connected = false;
-            Socket socket = socketChannel.socket();
-            socket.shutdownOutput();
-            socket.shutdownInput();
-            socket.close();
-            socketChannel.close();
-            socket = null;
-            socketChannel = null;
+            if ( socketChannel != null ) {
+                Socket socket = socketChannel.socket();
+                socket.shutdownOutput();
+                socket.shutdownInput();
+                socket.close();
+                socketChannel.close();
+                socket = null;
+                socketChannel = null;
+            }
         } catch ( Exception x ) {
             log.error("Unable to disconnect.",x);
         } finally {
@@ -228,6 +227,8 @@
         curPos = 0;
         ackbuf.clear();
         remaining = 0;
+        complete = false;
+        attempt = 0;
     }
 
     private ByteBuffer getReadBuffer() {
@@ -252,6 +253,10 @@
            }
        } 
    }
+   
+   public byte[] getMessage() {
+       return current;
+   }
 
 
     /**
@@ -263,19 +268,6 @@
     public boolean checkKeepAlive() {
         return false;
     }
-
-    /**
-     * getAckTimeout
-     *
-     * @return long
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public long getAckTimeout() {
-        return this.ackTimeout;
-    }
-
-    
-
     /**
      * getSuspect
      *
@@ -313,14 +305,17 @@
     public boolean getDirect() {
         return direct;
     }
-    /**
-     * setAckTimeout
-     *
-     * @param timeout long
-     * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
-     */
-    public void setAckTimeout(long timeout) {
-        this.ackTimeout = timeout;
+
+    public Member getDestination() {
+        return destination;
+    }
+
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public int getAttempt() {
+        return attempt;
     }
 
     /**
@@ -369,5 +364,13 @@
 
     public void setDirect(boolean directBuffer) {
         this.direct = directBuffer;
+    }
+
+    public void setComplete(boolean complete) {
+        this.complete = complete;
+    }
+
+    public void setAttempt(int attempt) {
+        this.attempt = attempt;
     }
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java?rev=382464&r1=382463&r2=382464&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ParallelNioSender.java
 Thu Mar  2 10:47:01 2006
@@ -16,11 +16,17 @@
 package org.apache.catalina.tribes.tcp;
 
 
+import java.io.IOException;
+import java.nio.channels.Selector;
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.io.ClusterData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.Iterator;
+import java.nio.channels.SelectionKey;
 
 /**
  * <p>Title: </p>
@@ -35,22 +41,168 @@
  * @version 1.0
  */
 public class ParallelNioSender {
-    protected long timeout;
-    protected boolean waitForAck;
-    
-    public ParallelNioSender(long timeout, boolean waitForAck) {
+    protected long timeout = 15000;
+    protected long selectTimeout = 50; 
+    protected boolean waitForAck = false;
+    protected int retryAttempts=0;
+    protected int keepAliveCount = Integer.MAX_VALUE;
+    protected Selector selector;
+    protected HashMap nioSenders = new HashMap();
+    protected boolean directBuf = false;
+    protected int rxBufSize = 43800;
+    protected int txBufSize = 25188;
+    public ParallelNioSender(long timeout, 
+                             boolean waitForAck,
+                             int retryAttempts,
+                             boolean directBuf,
+                             int rxBufSize,
+                             int txBufSize) throws IOException {
         this.timeout = timeout;
         this.waitForAck = waitForAck;
+        this.retryAttempts = retryAttempts;
+        selector = Selector.open();
+        this.directBuf = directBuf;
+        this.rxBufSize = rxBufSize;
+        this.txBufSize = txBufSize;
     }
     
     
-    public synchronized void sendMessage(Member mbr, ChannelMessage msg) 
throws ChannelException {
+    public synchronized void sendMessage(Member[] destination, ChannelMessage 
msg) throws ChannelException {
         long start = System.currentTimeMillis();
         byte[] data = XByteBuffer.createDataPackage((ClusterData)msg);
+        NioSender[] senders = setupForSend(destination);
+        connect(senders);
+        setData(senders,data);
+        int remaining = senders.length;
+        try {
+            //loop until complete, an error happens, or we timeout
+            long delta = System.currentTimeMillis() - start;
+            while ( (remaining>0) && (delta<timeout) ) {
+                remaining -= doLoop(selectTimeout,retryAttempts);
+            }
+            if ( remaining > 0 ) {
+                //timeout has occured
+                ChannelException cx = new ChannelException("Operation has 
timed out("+timeout+" ms.).");
+                for (int i=0; i<senders.length; i++ ) {
+                    if (!senders[i].isComplete() ) 
cx.addFaultyMember(senders[i].getDestination());
+                }
+                throw cx;
+            }
+        } catch (Exception x ) {
+            try { this.close(); } catch (Exception ignore) {}
+            if ( x instanceof ChannelException ) throw (ChannelException)x;
+            else throw new ChannelException(x);
+        }
         
-        
     }
     
-    protected synchronized
+    private int doLoop(long selectTimeOut, int maxAttempts) throws 
IOException, ChannelException {
+        int completed = 0;
+        int selectedKeys = selector.select(selectTimeOut);
+        if (selectedKeys == 0) {
+            return 0;
+        }
+        Iterator it = selector.selectedKeys().iterator();
+        while (it.hasNext()) {
+            SelectionKey sk = (SelectionKey) it.next();
+            it.remove();
+            int readyOps = sk.readyOps();
+            sk.interestOps(sk.interestOps() & ~readyOps);
+            NioSender sender = (NioSender) sk.attachment();
+            try {
+                if (sender.process(sk)) {
+                    sender.reset();
+                    completed++;
+                    sender.setComplete(true);
+                }//end if
+            } catch (Exception x) {
+                byte[] data = sender.getMessage();
+                int attempt = sender.getAttempt()+1;
+                if ( sender.getAttempt() >= maxAttempts && maxAttempts>0 ) {
+                    try { 
+                        sender.disconnect(); 
+                        sender.connect();
+                        sender.setAttempt(attempt);
+                        sender.setMessage(data);
+                    }catch ( Exception ignore){
+                        //dont report the error on a resend
+                    }
+                } else {
+                    ChannelException cx = new ChannelException(x);
+                    cx.addFaultyMember(sender.getDestination());
+                    throw cx;
+                }//end if
+            }
+        }
+        return completed;
+
+    }
+    
+    private void connect(NioSender[] senders) throws ChannelException {
+        ChannelException x = null;
+        for (int i=0; i<senders.length; i++ ) {
+            try {
+                if (!senders[i].isConnected()) senders[i].connect();
+            }catch ( IOException io ) {
+                if ( x==null ) x = new ChannelException(io);
+                x.addFaultyMember(senders[i].getDestination());
+            }
+        }
+        if ( x != null ) throw x;
+    }
+    
+    private void setData(NioSender[] senders, byte[] data) throws 
ChannelException {
+        ChannelException x = null;
+        for (int i=0; i<senders.length; i++ ) {
+            try {
+                senders[i].setMessage(data);
+            }catch ( IOException io ) {
+                if ( x==null ) x = new ChannelException(io);
+                x.addFaultyMember(senders[i].getDestination());
+            }
+        }
+        if ( x != null ) throw x;
+    }
+    
+    
+    private NioSender[] setupForSend(Member[] destination) {
+        NioSender[] result = new NioSender[destination.length];
+        for ( int i=0; i<destination.length; i++ ) {
+            NioSender sender = (NioSender)nioSenders.get(destination[i]);
+            if ( sender == null ) {
+                sender = new NioSender(destination[i]);
+                nioSenders.put(destination[i],sender);
+            }
+            sender.reset();
+            sender.setSelector(selector);
+            sender.setDirect(directBuf);
+            sender.setRxBufSize(rxBufSize);
+            sender.setTxBufSize(txBufSize);
+            sender.setWaitForAck(waitForAck);
+            result[i] = sender;
+        }
+        return result;
+    }
+    
+    public synchronized void close() throws ChannelException  {
+        ChannelException x = null;
+        Object[] members = nioSenders.keySet().toArray();
+        for (int i=0; i<members.length; i++ ) {
+            Member mbr = (Member)members[i];
+            try {
+                NioSender sender = (NioSender)nioSenders.get(mbr);
+                sender.disconnect();
+            }catch ( Exception e ) {
+                if ( x == null ) x = new ChannelException(e);
+                x.addFaultyMember(mbr);
+            }
+            nioSenders.remove(mbr);
+        }
+        if ( x != null ) throw x;
+    }
+    
+    public void finalize() {
+        try {close(); }catch ( Exception ignore){}
+    }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to